summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog46
-rw-r--r--ChangeLogs/ChangeLog-02a46
-rw-r--r--ChangeLogs/ChangeLog-03a46
-rw-r--r--THANKS3
-rw-r--r--ace/ACE.cpp62
-rw-r--r--ace/ACE.h10
-rw-r--r--ace/Asynch_Acceptor.cpp13
-rw-r--r--ace/Asynch_Acceptor.h10
-rw-r--r--ace/POSIX_Asynch_IO.cpp532
-rw-r--r--ace/POSIX_Asynch_IO.h129
-rw-r--r--ace/POSIX_Proactor.cpp493
-rw-r--r--ace/POSIX_Proactor.h53
-rw-r--r--ace/SUN_Proactor.cpp202
-rw-r--r--ace/SUN_Proactor.h19
14 files changed, 1006 insertions, 658 deletions
diff --git a/ChangeLog b/ChangeLog
index efba375ef71..3f69f38b430 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -9,6 +9,21 @@ Tue Jan 2 10:25:30 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Edan Ayal <edan@bandwiz.com> for
ACE_Bounded_Cached_Connect_Strategy::find_or_create_svc_handler_i ().
+Tue Jan 2 09:03:18 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+
+ * ace/Asynch_Acceptor.{h,cpp},
+ ace/POSIX_Async_IO.{h,cpp},
+ ace/POSIX_Proactor.{h,cpp},
+ ace/SUN_Proactor.{h,cpp}: Added enhanced versions of ACE_POSIX_Proactor
+ and ACE_SUN_Proactor to fix various problems uncovered and fixed by
+ Alexander Libman <Alibman@baltimore.com>.
+
+ * ace/ACE.{h,cpp}: Fixed the timestamp() method so that it works
+ identically on Win32 and on UNIX. Also provided a new option
+ that'll make it possible to return a pointer to the beginning of
+ the time portion of "date and time." Thanks to Michael Searles
+ <msearles@base16.com> for contributing these fixes.
+
Mon Jan 01 16:35:00 2000 Michael Kircher <Micahel.Kircher@mchp.siemens.de>
* tests/Reader_Writer_Test.cpp: Added a #ifdef to check for native
@@ -219,28 +234,7 @@ Thu Dec 21 16:45:28 2000 Steve Huston <shuston@riverace.com>
removed to satisfy Win32 which now pays attention to that arg.
Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
-<<<<<<< ChangeLog
-=======
-
- * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where
- next_earliest_time should have been next_earliest. Thanks to
- Keith Brown <kalbrown@ix.netcom.com> for reporting this.
-
- * examples/IPC_SAP/SOCK_SAP/Makefile.bor: Reenable CPP-inserver-fancy.cpp
- now that it compiles! Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for confirming this.
-
- * examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp (handle_events): Added
- casts for (SOCKET) when using FD_SET to work around problems
- with Borland. Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for reporting this.
- * ace/Message_Block.h (ACE_Dynamic_Message_Strategy): Fixed a bunch
- of non-const accessor methods to be const. Thanks to Johnny
- Willemsen <johnny.willemsen@meco.nl> for reporting this.
->>>>>>> 4.1393
-
-<<<<<<< ChangeLog
* ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where
next_earliest_time should have been next_earliest. Thanks to
Keith Brown <kalbrown@ix.netcom.com> for reporting this.
@@ -266,16 +260,6 @@ Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
broadcast() don't release the mutex. Thanks to Mike Curtis
<mccurry@my-deja.com> for pointing this out.
-=======
- * examples/Naming/Makefile (BIN2): Added the test_open.cpp file
- to the Makefile. Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for reporting this.
-
- * docs/tutorials/016/page02.html: Clarify that signal() or
- broadcast() don't release the mutex. Thanks to Mike Curtis
- <mccurry@my-deja.com> for pointing this out.
-
->>>>>>> 4.1393
Wed Dec 20 19:44:16 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
* examples/Reactor/WFMO_Reactor/test_abandoned.cpp,
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index efba375ef71..3f69f38b430 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -9,6 +9,21 @@ Tue Jan 2 10:25:30 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Edan Ayal <edan@bandwiz.com> for
ACE_Bounded_Cached_Connect_Strategy::find_or_create_svc_handler_i ().
+Tue Jan 2 09:03:18 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+
+ * ace/Asynch_Acceptor.{h,cpp},
+ ace/POSIX_Async_IO.{h,cpp},
+ ace/POSIX_Proactor.{h,cpp},
+ ace/SUN_Proactor.{h,cpp}: Added enhanced versions of ACE_POSIX_Proactor
+ and ACE_SUN_Proactor to fix various problems uncovered and fixed by
+ Alexander Libman <Alibman@baltimore.com>.
+
+ * ace/ACE.{h,cpp}: Fixed the timestamp() method so that it works
+ identically on Win32 and on UNIX. Also provided a new option
+ that'll make it possible to return a pointer to the beginning of
+ the time portion of "date and time." Thanks to Michael Searles
+ <msearles@base16.com> for contributing these fixes.
+
Mon Jan 01 16:35:00 2000 Michael Kircher <Micahel.Kircher@mchp.siemens.de>
* tests/Reader_Writer_Test.cpp: Added a #ifdef to check for native
@@ -219,28 +234,7 @@ Thu Dec 21 16:45:28 2000 Steve Huston <shuston@riverace.com>
removed to satisfy Win32 which now pays attention to that arg.
Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
-<<<<<<< ChangeLog
-=======
-
- * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where
- next_earliest_time should have been next_earliest. Thanks to
- Keith Brown <kalbrown@ix.netcom.com> for reporting this.
-
- * examples/IPC_SAP/SOCK_SAP/Makefile.bor: Reenable CPP-inserver-fancy.cpp
- now that it compiles! Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for confirming this.
-
- * examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp (handle_events): Added
- casts for (SOCKET) when using FD_SET to work around problems
- with Borland. Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for reporting this.
- * ace/Message_Block.h (ACE_Dynamic_Message_Strategy): Fixed a bunch
- of non-const accessor methods to be const. Thanks to Johnny
- Willemsen <johnny.willemsen@meco.nl> for reporting this.
->>>>>>> 4.1393
-
-<<<<<<< ChangeLog
* ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where
next_earliest_time should have been next_earliest. Thanks to
Keith Brown <kalbrown@ix.netcom.com> for reporting this.
@@ -266,16 +260,6 @@ Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
broadcast() don't release the mutex. Thanks to Mike Curtis
<mccurry@my-deja.com> for pointing this out.
-=======
- * examples/Naming/Makefile (BIN2): Added the test_open.cpp file
- to the Makefile. Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for reporting this.
-
- * docs/tutorials/016/page02.html: Clarify that signal() or
- broadcast() don't release the mutex. Thanks to Mike Curtis
- <mccurry@my-deja.com> for pointing this out.
-
->>>>>>> 4.1393
Wed Dec 20 19:44:16 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
* examples/Reactor/WFMO_Reactor/test_abandoned.cpp,
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index efba375ef71..3f69f38b430 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -9,6 +9,21 @@ Tue Jan 2 10:25:30 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Edan Ayal <edan@bandwiz.com> for
ACE_Bounded_Cached_Connect_Strategy::find_or_create_svc_handler_i ().
+Tue Jan 2 09:03:18 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+
+ * ace/Asynch_Acceptor.{h,cpp},
+ ace/POSIX_Async_IO.{h,cpp},
+ ace/POSIX_Proactor.{h,cpp},
+ ace/SUN_Proactor.{h,cpp}: Added enhanced versions of ACE_POSIX_Proactor
+ and ACE_SUN_Proactor to fix various problems uncovered and fixed by
+ Alexander Libman <Alibman@baltimore.com>.
+
+ * ace/ACE.{h,cpp}: Fixed the timestamp() method so that it works
+ identically on Win32 and on UNIX. Also provided a new option
+ that'll make it possible to return a pointer to the beginning of
+ the time portion of "date and time." Thanks to Michael Searles
+ <msearles@base16.com> for contributing these fixes.
+
Mon Jan 01 16:35:00 2000 Michael Kircher <Micahel.Kircher@mchp.siemens.de>
* tests/Reader_Writer_Test.cpp: Added a #ifdef to check for native
@@ -219,28 +234,7 @@ Thu Dec 21 16:45:28 2000 Steve Huston <shuston@riverace.com>
removed to satisfy Win32 which now pays attention to that arg.
Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
-<<<<<<< ChangeLog
-=======
-
- * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where
- next_earliest_time should have been next_earliest. Thanks to
- Keith Brown <kalbrown@ix.netcom.com> for reporting this.
-
- * examples/IPC_SAP/SOCK_SAP/Makefile.bor: Reenable CPP-inserver-fancy.cpp
- now that it compiles! Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for confirming this.
-
- * examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp (handle_events): Added
- casts for (SOCKET) when using FD_SET to work around problems
- with Borland. Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for reporting this.
- * ace/Message_Block.h (ACE_Dynamic_Message_Strategy): Fixed a bunch
- of non-const accessor methods to be const. Thanks to Johnny
- Willemsen <johnny.willemsen@meco.nl> for reporting this.
->>>>>>> 4.1393
-
-<<<<<<< ChangeLog
* ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where
next_earliest_time should have been next_earliest. Thanks to
Keith Brown <kalbrown@ix.netcom.com> for reporting this.
@@ -266,16 +260,6 @@ Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
broadcast() don't release the mutex. Thanks to Mike Curtis
<mccurry@my-deja.com> for pointing this out.
-=======
- * examples/Naming/Makefile (BIN2): Added the test_open.cpp file
- to the Makefile. Thanks to Johnny Willemsen
- <johnny.willemsen@meco.nl> for reporting this.
-
- * docs/tutorials/016/page02.html: Clarify that signal() or
- broadcast() don't release the mutex. Thanks to Mike Curtis
- <mccurry@my-deja.com> for pointing this out.
-
->>>>>>> 4.1393
Wed Dec 20 19:44:16 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
* examples/Reactor/WFMO_Reactor/test_abandoned.cpp,
diff --git a/THANKS b/THANKS
index 077c003d7de..ea0eb732953 100644
--- a/THANKS
+++ b/THANKS
@@ -1105,7 +1105,8 @@ Kent Stewart <kbstew99@hotmail.com>
Alexander Kogan <jk@kogan.nnov.ru>
Michael Lindner <mikel@att.net>
Arnaud Compan <compan@ipanematech.com>
-Frank A. Hunleth" <fhunleth@cs.wustl.edu>
+Frank A. Hunleth <fhunleth@cs.wustl.edu>
+Michael Searles <msearles@base16.com>
Bogdan Jeram <bjeram@eso.org>
I would particularly like to thank Paul Stephenson, who worked with me
diff --git a/ace/ACE.cpp b/ace/ACE.cpp
index a8c83390d63..c1a235385e5 100644
--- a/ace/ACE.cpp
+++ b/ace/ACE.cpp
@@ -2394,13 +2394,12 @@ ACE::format_hexdump (const char *buffer,
// Returns the current timestamp in the form
// "hour:minute:second:microsecond." The month, day, and year are
-// also stored in the beginning of the date_and_time array. Returns 0
-// if unsuccessful, else returns pointer to beginning of the "time"
-// portion of <day_and_time>.
+// also stored in the beginning of the date_and_time array.
ACE_TCHAR *
ACE::timestamp (ACE_TCHAR date_and_time[],
- int date_and_timelen)
+ int date_and_timelen,
+ int return_pointer_to_first_digit)
{
//ACE_TRACE ("ACE::timestamp");
@@ -2411,22 +2410,43 @@ ACE::timestamp (ACE_TCHAR date_and_time[],
}
#if defined (WIN32)
- // @@ Jesper, I think Win32 supports all the UNIX versions below.
- // Therefore, we can probably remove this WIN32 ifdef altogether.
- SYSTEMTIME local;
- ::GetLocalTime (&local);
-
- ACE_OS::sprintf (date_and_time,
- ACE_LIB_TEXT ("%02d/%02d/%04d %02d.%02d.%02d.%06d"),
- (int) local.wMonth, // new, also the %02d in sprintf
- (int) local.wDay, // new, also the %02d in sprintf
- (int) local.wYear, // new, also the %02d in sprintf
- (int) local.wHour,
- (int) local.wMinute,
- (int) local.wSecond,
- (int) local.wMilliseconds * 1000);
- date_and_time[26] = '\0';
- return &date_and_time[11];
+ // Emulate Unix. Win32 does NOT support all the UNIX versions
+ // below, so DO we need this ifdef.
+ static const char *day_of_week_name[] = {
+ ACE_LIB_TEXT ("Sun"),
+ ACE_LIB_TEXT ("Mon"),
+ ACE_LIB_TEXT ("Tue"),
+ ACE_LIB_TEXT ("Wed"),
+ ACE_LIB_TEXT ("Thr"),
+ ACE_LIB_TEXT ("Fri"),
+ ACE_LIB_TEXT ("Sat")
+ };
+
+ static const char *month_name[] = {
+ ACE_LIB_TEXT ("Jan"),
+ ACE_LIB_TEXT ("Feb"),
+ ACE_LIB_TEXT ("Mar"),
+ ACE_LIB_TEXT ("Apr"),
+ ACE_LIB_TEXT ("May"),
+ ACE_LIB_TEXT ("Jun"),
+ ACE_LIB_TEXT ("Jul"),
+ ACE_LIB_TEXT ("Aug"),
+ ACE_LIB_TEXT ("Sep"),
+ ACE_LIB_TEXT ("Oct"),
+ ACE_LIB_TEXT ("Nov"),
+ ACE_LIB_TEXT ("Dec") };
+
+ ACE_OS::sprintf (date_and_time,
+ ACE_LIB_TEXT ("%3s %3s %2d %04d %02d:%02d:%02d.%06d"),
+ day_of_week_name[local.wDayOfWeek],
+ month_name[local.wMonth - 1],
+ (int) local.wDay,
+ (int) local.wYear,
+ (int) local.wHour,
+ (int) local.wMinute,
+ (int) local.wSecond,
+ (int) (local.wMilliseconds * 1000));
+ return &date_and_time[15 + (return_pointer_to_first_digit != 0)];
#else /* UNIX */
ACE_TCHAR timebuf[26]; // This magic number is based on the ctime(3c) man page.
ACE_Time_Value cur_time = ACE_OS::gettimeofday ();
@@ -2454,7 +2474,7 @@ ACE::timestamp (ACE_TCHAR date_and_time[],
timetmp,
cur_time.usec ());
date_and_time[33] = '\0';
- return &date_and_time[15];
+ return &date_and_time[15 + (return_pointer_to_first_digit != 0)];
#endif /* WIN32 */
}
diff --git a/ace/ACE.h b/ace/ACE.h
index 4e52189602b..a5f3601d579 100644
--- a/ace/ACE.h
+++ b/ace/ACE.h
@@ -35,7 +35,7 @@ class ACE_Message_Block;
* @class ACE
*
* @brief Contains value added ACE methods that extend the behavior
- * of the UNIX and Win32 OS calls.
+ * of the UNIX and Win32 OS calls.
*
* This class consolidates all these ACE static methods in a
* single place in order to manage the namespace better. These
@@ -408,10 +408,14 @@ public:
* "hour:minute:second:microsecond." The month, day, and year are
* also stored in the beginning of the date_and_time array. Returns
* 0 if unsuccessful, else returns pointer to beginning of the
- * "time" portion of <day_and_time>.
+ * "time" portion of <day_and_time>. If
+ * <return_pointer_to_first_digit> is 0 then return a pointer to the
+ * space before the time, else return a pointer to the beginning of
+ * the time portion.
*/
static ACE_TCHAR *timestamp (ACE_TCHAR date_and_time[],
- int time_len);
+ int time_len,
+ int return_pointer_to_first_digit = 0);
/**
* if <avoid_zombies> == 0 call <ACE_OS::fork> directly, else create
diff --git a/ace/Asynch_Acceptor.cpp b/ace/Asynch_Acceptor.cpp
index 1da5c30086d..c32b8b330ed 100644
--- a/ace/Asynch_Acceptor.cpp
+++ b/ace/Asynch_Acceptor.cpp
@@ -1,3 +1,4 @@
+/* -*- C++ -*- */
// $Id$
#ifndef ACE_ASYNCH_ACCEPTOR_C
@@ -183,8 +184,8 @@ ACE_Asynch_Acceptor<HANDLER>::handle_accept (const ACE_Asynch_Accept::Result &re
int error = 0;
// If the asynchronous accept fails.
- if (!error &&
- !result.success ())
+ if (!result.success () ||
+ result.accept_handle() == ACE_INVALID_HANDLE )
{
error = 1;
ACE_ERROR ((LM_ERROR,
@@ -268,7 +269,8 @@ ACE_Asynch_Acceptor<HANDLER>::handle_accept (const ACE_Asynch_Accept::Result &re
}
// On failure, no choice but to close the socket
- if (error)
+ if (error &&
+ result.accept_handle() != ACE_INVALID_HANDLE )
ACE_OS::closesocket (result.accept_handle ());
// Delete the dynamically allocated message_block
@@ -300,7 +302,10 @@ ACE_Asynch_Acceptor<HANDLER>::cancel (void)
|| (defined (__BORLANDC__) && (__BORLANDC__ >= 0x530)))
return (int) ::CancelIo (this->listen_handle_);
#else
- ACE_NOTSUP_RETURN (-1);
+ //ACE_NOTSUP_RETURN (-1);
+ // Supported now
+ return this->asynch_accept_.cancel();
+
#endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && ((defined (_MSC_VER) && (_MSC_VER > 1020)) || (defined (__BORLANDC__) && (__BORLANDC__ >= 0x530))) */
}
diff --git a/ace/Asynch_Acceptor.h b/ace/Asynch_Acceptor.h
index 835ed63ec5b..cd8b286b6c3 100644
--- a/ace/Asynch_Acceptor.h
+++ b/ace/Asynch_Acceptor.h
@@ -1,3 +1,4 @@
+/* -*- C++ -*- */
//=============================================================================
/**
@@ -9,7 +10,6 @@
*/
//=============================================================================
-
#ifndef ACE_ASYNCH_ACCEPTOR_H
#define ACE_ASYNCH_ACCEPTOR_H
#include "ace/pre.h"
@@ -94,8 +94,12 @@ public:
/**
* This cancels all pending accepts operations that were issued by
- * the calling thread. The function does not cancel accept
- * operations issued by other threads.
+ * the calling thread.
+ * Windows NT- The function does not cancel accept operations
+ * issued by other threads
+ * POSIX - all OK, it delegates cancelation to the
+ * ACE_POSIX_Asynch_Accept
+ *
*/
virtual int cancel (void);
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp
index 3c279596cb4..366fbf31a10 100644
--- a/ace/POSIX_Asynch_IO.cpp
+++ b/ace/POSIX_Asynch_IO.cpp
@@ -1,3 +1,4 @@
+/* -*- C++ -*- */
// $Id$
#include "ace/POSIX_Asynch_IO.h"
@@ -20,6 +21,12 @@ ACE_POSIX_Asynch_Result::bytes_transferred (void) const
return this->bytes_transferred_;
}
+void
+ACE_POSIX_Asynch_Result::set_bytes_transferred (u_long nbytes)
+{
+ this->bytes_transferred_= nbytes;
+}
+
const void *
ACE_POSIX_Asynch_Result::act (void) const
{
@@ -44,6 +51,11 @@ ACE_POSIX_Asynch_Result::error (void) const
return this->error_;
}
+void
+ACE_POSIX_Asynch_Result::set_error (u_long errcode)
+{
+ this->error_=errcode;
+}
ACE_HANDLE
ACE_POSIX_Asynch_Result::event (void) const
{
@@ -114,7 +126,7 @@ ACE_POSIX_Asynch_Result::ACE_POSIX_Asynch_Result (ACE_Handler &handler,
aio_sigevent.sigev_signo = signal_number;
// Event is not used on POSIX.
-ACE_UNUSED_ARG (event);
+ ACE_UNUSED_ARG (event);
//
// @@ Support offset_high with aiocb64.
@@ -178,12 +190,27 @@ ACE_POSIX_Asynch_Operation::cancel (void)
if (!p_impl)
return -1;
- // For <ACE_SUN_Proactor> this function is not implemented yet. We
- // should call aiocancel instead of aio_cancel but we have not got
- // information about aio_result_t.
+ // For ACE_SUN_Proactor and ACE_POSIX_AIOCB_Proactor
+ // we should call cancel_aio (this->handle_)
+ // method to cancel correctly all deferred AIOs
- if (p_impl->get_impl_type () == ACE_POSIX_Proactor::PROACTOR_SUN)
- return -1;
+ switch ( p_impl->get_impl_type ())
+ {
+ case ACE_POSIX_Proactor::PROACTOR_SUN:
+ case ACE_POSIX_Proactor::PROACTOR_AIOCB:
+ {
+ ACE_POSIX_AIOCB_Proactor * p_impl_aiocb = ACE_dynamic_cast
+ (ACE_POSIX_AIOCB_Proactor *,
+ p_impl );
+
+ if ( ! p_impl_aiocb)
+ return -1;
+
+ return p_impl_aiocb->cancel_aio (this->handle_);
+ }
+ default:
+ break;
+ }
int result = ::aio_cancel (this->handle_, 0);
@@ -1680,18 +1707,16 @@ ACE_POSIX_Asynch_Accept_Result::post_completion (ACE_Proactor_Impl *proactor)
class ACE_Export ACE_POSIX_Asynch_Accept_Handler : public ACE_Event_Handler
{
// = TITLE
- // For the POSIX implementation, we have two helper classes
- // (ACE_POSIX_AIOCB_Asynch_Accept_Hander and
- // ACE_POSIX_SIG_Asynch_Accept_Handler) to do <Asynch_Accept>. This
- // class abstracts out the commonalities on these two helper classes.
+ // For the POSIX implementation this class is common
+ // for all Proactors (AIOCB/SIG/SUN)
//
// = DESCRIPTION
//
+
public:
~ACE_POSIX_Asynch_Accept_Handler (void);
// Destructor.
-protected:
ACE_POSIX_Asynch_Accept_Handler (ACE_Reactor* reactor,
ACE_POSIX_Proactor *posix_proactor);
// Constructor. Give the reactor so that it can activate/deactivate
@@ -1699,22 +1724,29 @@ protected:
// handler can send the <POSIX_Asynch_Accept> result block through
// <post_completion>.
- int register_accept_call_i (ACE_POSIX_Asynch_Accept_Result* result);
+ int cancel_uncompleted (int flg_notify);
+ // flg_notify points whether or not we should send notification about
+ // canceled accepts
+
+
+ int register_accept_call (ACE_POSIX_Asynch_Accept_Result* result);
// Worker method for registering this <accept> call with the local
- // handler. This method has the common code found between the two
- // differnt implementation subclasses. This method assumes that
- // locks are already obtained to access the shared the queues.
+ // handler. This method obtains lock to access the shared the queues.
ACE_POSIX_Asynch_Accept_Result* deregister_accept_call (void);
// Method for deregistering.
+ int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+ // Called when accept event comes up on <listen_hanlde>
+
+protected:
ACE_Reactor* reactor_;
// Reactor used by the Asynch_Accept. We need this here to enable
// and disable the <handle> now and then, depending on whether any
// <accept> is pending or no.
ACE_POSIX_Proactor *posix_proactor_;
- // POSIX_Proactor.
+ // POSIX_Proactor implementation.
ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result*> result_queue_;
// Queue of Result pointers that correspond to all the <accept>'s
@@ -1729,75 +1761,27 @@ protected:
// *********************************************************************
-class ACE_Export ACE_POSIX_AIOCB_Asynch_Accept_Handler : public ACE_POSIX_Asynch_Accept_Handler
-{
- // = TITLE
- // For the POSIX implementation, this class takes care of doing
- // <Asynch_Accept> for AIOCB strategy.
- //
- // = DESCRIPTION
- //
-public:
- ACE_POSIX_AIOCB_Asynch_Accept_Handler (ACE_Reactor *reactor,
- ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
- // Constructor. Give the reactor so that it can activate/deactivate
- // the handlers. Give also the proactor used here, so that the
- // handler can send information through the notification pipe
- // (<post_completion>).
-
- ~ACE_POSIX_AIOCB_Asynch_Accept_Handler (void);
- // Destructor.
-
- int register_accept_call (ACE_POSIX_Asynch_Accept_Result* result);
- // Register this <accept> call with the local handler.
-
- virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when accept event comes up on the <listen_handle>.
-};
-
-// *********************************************************************
-
-class ACE_Export ACE_POSIX_SIG_Asynch_Accept_Handler : public ACE_POSIX_Asynch_Accept_Handler
-{
- // = TITLE
- // For the POSIX implementation, this class takes care of doing
- // Asynch_Accept.
- //
- // = DESCRIPTION
- //
-public:
- ACE_POSIX_SIG_Asynch_Accept_Handler (ACE_Reactor* reactor,
- ACE_POSIX_SIG_Proactor *posix_sig_proactor);
- // Constructor. Give the reactor so that it can activate/deactivate
- // the handlers. Give also the proactor used here, so that the
- // handler can send information through <post_completion>.
-
- ~ACE_POSIX_SIG_Asynch_Accept_Handler (void);
- // Destructor.
-
- int register_accept_call (ACE_POSIX_Asynch_Accept_Result *result);
- // Register this <accept> call with the local handler.
-
- virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when accept event comes up on the <listen_handle>.
-};
-
-// *********************************************************************
-
ACE_POSIX_Asynch_Accept_Handler::ACE_POSIX_Asynch_Accept_Handler (ACE_Reactor* reactor,
ACE_POSIX_Proactor *posix_proactor)
: reactor_ (reactor),
posix_proactor_ (posix_proactor)
{
+ ACE_TRACE ("ACE_POSIX_Asynch_Accept_Handler::ctor");
}
ACE_POSIX_Asynch_Accept_Handler::~ACE_POSIX_Asynch_Accept_Handler (void)
{
+ ACE_TRACE ("ACE_POSIX_Asynch_Accept_Handler::dtor");
}
int
-ACE_POSIX_Asynch_Accept_Handler::register_accept_call_i (ACE_POSIX_Asynch_Accept_Result* result)
+ACE_POSIX_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Accept_Result* result)
{
+ // The queue is updated by main thread in the register function call
+ // and thru the auxillary thread in the deregister fun. So let us
+ // mutex it.
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
// Insert this result to the queue.
int insert_result = this->result_queue_.enqueue_tail (result);
if (insert_result == -1)
@@ -1830,7 +1814,7 @@ ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void)
// The queue is updated by main thread in the register function call and
// thru the auxillary thread in the deregister fun. So let us mutex
// it.
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0);
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
// Get the first item (result ptr) from the Queue.
ACE_POSIX_Asynch_Accept_Result* result = 0;
@@ -1858,102 +1842,60 @@ ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void)
return result;
}
-// *********************************************************************
-
-ACE_POSIX_AIOCB_Asynch_Accept_Handler::ACE_POSIX_AIOCB_Asynch_Accept_Handler (ACE_Reactor* reactor,
- ACE_POSIX_AIOCB_Proactor* posix_aiocb_proactor)
- : ACE_POSIX_Asynch_Accept_Handler (reactor, posix_aiocb_proactor)
-{
-}
-
-ACE_POSIX_AIOCB_Asynch_Accept_Handler::~ACE_POSIX_AIOCB_Asynch_Accept_Handler (void)
-{
-}
+//@@ New method cancel_uncompleted
+// It performs cancellation of all pending requests
+//
+// Parameter flg_notify can be
+// 0 - don't send notifications about canceled accepts
+// !0 - notify user about canceled accepts
+// according POSIX standards we should receive notifications
+// on canceled AIO requests
+//
+// Return value : number of cancelled requests
+//
+
int
-ACE_POSIX_AIOCB_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Accept_Result* result)
+ACE_POSIX_Asynch_Accept_Handler::cancel_uncompleted (int flg_notify)
{
- // The queue is updated by main thread in the register function call
- // and thru the auxillary thread in the deregister fun. So let us
- // mutex it.
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
- return register_accept_call_i (result);
-}
-
-int
-ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */)
-{
- // An <accept> has been sensed on the <listen_handle>. We should be
- // able to just go ahead and do the <accept> now on this <fd>. This
- // should be the same as the <listen_handle>.
-
- // Deregister this info pertaining to this <accept> call.
- ACE_POSIX_Asynch_Accept_Result* result = this->deregister_accept_call ();
- if (result == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t):%p\n",
- "ACE_POSIX_AIOCB_Asynch_Accept_Handler::"
- "handle_input:deregister_accept_call failed"),
- -1);
-
- // Issue <accept> now.
- // @@ We shouldnt block here since we have already done poll/select
- // thru reactor. But are we sure?
- ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), 0, 0);
- if (new_handle == ACE_INVALID_HANDLE)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t):%p\n",
- "ACE_POSIX_AIOCB_Asynch_Accept_Handler::"
- "handle_input:<accept> system call failed"),
- -1);
+ int retval = 0;
- // Accept has completed.
-
- // Store the new handle.
- result->aio_fildes = new_handle;
-
- // Notify the main process about this completion
- // Send the Result through the notification pipe.
- if (this->posix_proactor_->post_completion (result) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:(%P | %t):%p\n",
- "ACE_POSIX_AIOCB_Asynch_Accept_Handler::"
- "handle_input:<post_completion> failed"),
- -1);
+ for ( ; ; retval++ )
+ {
+ ACE_POSIX_Asynch_Accept_Result* result = 0;
- return 0;
-}
+ this->result_queue_.dequeue_head (result);
-// *********************************************************************
+ if (result == 0)
+ break;
-ACE_POSIX_SIG_Asynch_Accept_Handler::ACE_POSIX_SIG_Asynch_Accept_Handler (ACE_Reactor* reactor,
- ACE_POSIX_SIG_Proactor *posix_sig_proactor)
- : ACE_POSIX_Asynch_Accept_Handler (reactor, posix_sig_proactor)
-{
-}
+ this->reactor_->suspend_handler (result->listen_handle ());
+
+ if ( ! flg_notify ) //if we should not notify
+ delete result ; // we have to delete result
+ else //else notify as any cancelled AIO
+ {
+ // Store the new handle.
+ result->aio_fildes = ACE_INVALID_HANDLE ;
+ result->set_bytes_transferred (0);
+ result->set_error (ECANCELED);
+
+ if (this->posix_proactor_->post_completion (result) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "ACE_POSIX_Asynch_Accept_Handler::"
+ "cancel_uncompleted:<post_completion> failed"));
+ }
+ }
-ACE_POSIX_SIG_Asynch_Accept_Handler::~ACE_POSIX_SIG_Asynch_Accept_Handler (void)
-{
+ return retval;
}
-int
-ACE_POSIX_SIG_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Accept_Result* result)
-{
- // The queue is updated by main thread in the register function call
- // and thru the auxillary thread in the deregister fun. So let us
- // mutex it.
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
-
- // Do the work.
- if (this->register_accept_call_i (result) == -1)
- return -1;
-
- return 0;
-}
int
-ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */)
+ACE_POSIX_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */)
{
// An <accept> has been sensed on the <listen_handle>. We should be
// able to just go ahead and do the <accept> now on this <fd>. This
@@ -1964,7 +1906,7 @@ ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */)
if (result == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"%N:%l:(%P | %t):%p\n",
- "ACE_POSIX_SIG_Asynch_Accept_Handler::"
+ "ACE_POSIX_AIOCB_Asynch_Accept_Handler::"
"handle_input:deregister_accept_call failed"),
-1);
@@ -1973,40 +1915,46 @@ ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */)
// thru reactor. But are we sure?
ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), 0, 0);
if (new_handle == ACE_INVALID_HANDLE)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:(%P | %t):%p\n",
- "ACE_POSIX_SIG_Asynch_Accept_Handler::"
- "handle_input:<accept> system call failed"),
- -1);
+ {
+ result->set_error( errno );
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t):%p\n",
+ "ACE_POSIX_AIOCB_Asynch_Accept_Handler::"
+ "handle_input:<accept> system call failed"));
- // Accept has completed.
+ // Notify client as usual, "AIO" finished with errors
+ }
// Store the new handle.
result->aio_fildes = new_handle;
- // Notify the main process about this completion.
+ // Notify the main process about this completion
+ // Send the Result through the notification pipe.
if (this->posix_proactor_->post_completion (result) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error:(%P | %t):%p\n",
- "ACE_POSIX_SIG_Asynch_Accept_Handler::"
+ "ACE_POSIX_AIOCB_Asynch_Accept_Handler::"
"handle_input:<post_completion> failed"),
-1);
return 0;
}
+
// *********************************************************************
-ACE_POSIX_AIOCB_Asynch_Accept::ACE_POSIX_AIOCB_Asynch_Accept (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor *posix_proactor)
: ACE_Asynch_Operation_Impl (),
ACE_Asynch_Accept_Impl (),
- ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor),
- accept_handler_ (0)
+ ACE_POSIX_Asynch_Operation (),
+ accept_handler_ (0),
+ grp_id_(-1), //thread not spawn
+ posix_proactor_ (posix_proactor) //save concrete proactor impl.
{
}
int
-ACE_POSIX_AIOCB_Asynch_Accept::accept (ACE_Message_Block &message_block,
+ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
u_long bytes_to_read,
ACE_HANDLE accept_handle,
const void *act,
@@ -2032,7 +1980,7 @@ ACE_POSIX_AIOCB_Asynch_Accept::accept (ACE_Message_Block &message_block,
message_block,
bytes_to_read,
act,
- this->posix_proactor ()->get_handle (),
+ this->posix_proactor_->get_handle (),
priority,
signal_number),
-1);
@@ -2045,11 +1993,21 @@ ACE_POSIX_AIOCB_Asynch_Accept::accept (ACE_Message_Block &message_block,
}
int
-ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler,
+ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler,
ACE_HANDLE handle,
const void *completion_key,
ACE_Proactor *proactor)
{
+ // check for non zero accept_handler_
+ // we could not create a new handler without closing the previous
+
+ if ( this->accept_handler_ != 0 )
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:ACE_POSIX_Asynch_Accept::open:"
+ "accept_handler_ not null\n"),
+ -1);
+
+
int result = ACE_POSIX_Asynch_Operation::open (handler,
handle,
completion_key,
@@ -2060,8 +2018,8 @@ ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler,
// Init the Asynch_Accept_Handler now. It needs to keep Proactor
// also with it.
ACE_NEW_RETURN (this->accept_handler_,
- ACE_POSIX_AIOCB_Asynch_Accept_Handler (&this->reactor_,
- this->posix_proactor ()),
+ ACE_POSIX_Asynch_Accept_Handler (&this->reactor_,
+ this->posix_proactor_),
-1);
// Register the handle with the reactor.
@@ -2083,9 +2041,11 @@ ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler,
// Spawn the thread. It is the only thread we are going to have. It
// will do the <handle_events> on the reactor.
- return_val = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_AIOCB_Asynch_Accept::thread_function,
- ACE_reinterpret_cast (void *, &this->reactor_));
- if (return_val == -1)
+ // save group id of the created thread
+
+ grp_id_ = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_Asynch_Accept::thread_function,
+ ACE_reinterpret_cast (void *, &this->reactor_));
+ if (grp_id_ == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%N:%l:Thread_Manager::spawn failed\n"),
-1);
@@ -2093,12 +2053,73 @@ ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler,
return 0;
}
-ACE_POSIX_AIOCB_Asynch_Accept::~ACE_POSIX_AIOCB_Asynch_Accept (void)
+ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void)
{
+ this->close (0); // not send notifications to user
+}
+
+int
+ACE_POSIX_Asynch_Accept::close ( int flg_notify)
+{
+ // 1. It performs cancellation of all pending requests
+ // 2. Stops and waits for the thread we had created
+ // 3. Removes accept_handler_ from Reactor
+ // 4. Deletes accept_handler_
+ // 5. close the socket
+ //
+ // Parameter flg_notify can be
+ // 0 - don't send notifications about canceled accepts
+ // !0 - notify user about canceled accepts
+ // according POSIX standards we should receive notifications
+ // on canceled AIO requests
+ //
+ // Return codes : 0 - OK ,
+ // -1 - Errors
+
+ if ( this->accept_handler_ )
+ this->accept_handler_->cancel_uncompleted ( flg_notify );
+
+ //stop and wait for the thread
+
+ if ( grp_id_ != -1 )
+ {
+ reactor_.end_reactor_event_loop ();
+
+ if ( ACE_Thread_Manager::instance ()->wait_grp (grp_id_) ==-1)
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:Thread_Manager::wait_grp failed\n"));
+ else
+ grp_id_ = -1;
+ }
+
+ //AL remove and destroy accept_handler_
+
+ if ( this->accept_handler_ != 0 )
+ {
+ this->reactor_.remove_handler
+ ( this->accept_handler_,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL );
+
+ delete this->accept_handler_ ;
+ this->accept_handler_ = 0 ;
+ }
+
+ // It looks like a good place to close listen handle here.
+ // But I am not sure with compatibility with the old programs
+ // You can comment the closure of the socket
+
+ if ( this->handle_ != ACE_INVALID_HANDLE )
+ {
+ ACE_OS::closesocket (this->handle_);
+ this->handle_=ACE_INVALID_HANDLE;
+ }
+
+ return 0;
}
void*
-ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor)
+ACE_POSIX_Asynch_Accept::thread_function (void* arg_reactor)
{
// Retrieve the reactor pointer from the argument.
ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *,
@@ -2116,7 +2137,9 @@ ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor)
// Handle events.
int result = 0;
- while (result != -1)
+
+ //while (result != -1)
+ while ( reactor->reactor_event_loop_done () == 0 )
{
result = reactor->handle_events ();
}
@@ -2128,151 +2151,36 @@ ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor)
// the call to the ACE_POSIX_Asynch_Operation base class.
int
-ACE_POSIX_AIOCB_Asynch_Accept::cancel (void)
-{
- return ACE_POSIX_Asynch_Operation::cancel ();
-}
-
-ACE_Proactor *
-ACE_POSIX_AIOCB_Asynch_Accept::proactor (void) const
+ACE_POSIX_Asynch_Accept::cancel (void)
{
- return ACE_POSIX_Asynch_Operation::proactor ();
-}
+ //We are not ACE_POSIX_Asynch_Operation
+ //so we could not call ::aiocancel ()
+ //We delegate real cancelation to the accept_handler_
+ // accept_handler_->cancel_uncompleted (1)
-// *********************************************************************
+ //return ACE_POSIX_Asynch_Operation::cancel ();
-ACE_POSIX_SIG_Asynch_Accept::ACE_POSIX_SIG_Asynch_Accept (ACE_POSIX_SIG_Proactor *posix_sig_proactor)
- : ACE_Asynch_Operation_Impl (),
- ACE_Asynch_Accept_Impl (),
- ACE_POSIX_SIG_Asynch_Operation (posix_sig_proactor),
- accept_handler_ (0)
-{
-}
+ if ( this->accept_handler_ == 0 )
+ return 1 ; // AIO_ALLDONE
-int
-ACE_POSIX_SIG_Asynch_Accept::accept (ACE_Message_Block &message_block,
- u_long bytes_to_read,
- ACE_HANDLE accept_handle,
- const void *act,
- int priority,
- int signal_number)
-{
- // Sanity check: make sure that enough space has been allocated by
- // the caller.
- size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr);
- size_t space_in_use = message_block.wr_ptr () - message_block.base ();
- size_t total_size = message_block.size ();
- size_t available_space = total_size - space_in_use;
- size_t space_needed = bytes_to_read + 2 * address_size;
- if (available_space < space_needed)
- ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("Buffer too small\n")), -1);
+ //cancel with notifications as POSIX should do
- // Common code for both WIN and POSIX.
- ACE_POSIX_Asynch_Accept_Result *result = 0;
- ACE_NEW_RETURN (result,
- ACE_POSIX_Asynch_Accept_Result (*this->handler_,
- this->handle_,
- accept_handle,
- message_block,
- bytes_to_read,
- act,
- this->posix_sig_proactor_->get_handle (),
- priority,
- signal_number),
- -1);
+ int retval = this->accept_handler_->cancel_uncompleted (1);
- // Register this <accept> call with the local handler.
- this->accept_handler_->register_accept_call (result);
+ //retval contains now the number of canceled requests
+
+ if ( retval == 0 )
+ return 1 ; // AIO_ALLDONE
- return 0;
-}
+ if ( retval > 0 )
+ return 0; // AIO_CANCELED
+ return -1;
-int
-ACE_POSIX_SIG_Asynch_Accept::open (ACE_Handler &handler,
- ACE_HANDLE handle,
- const void *completion_key,
- ACE_Proactor *proactor)
-{
- int result = ACE_POSIX_Asynch_Operation::open (handler,
- handle,
- completion_key,
- proactor);
- if (result == -1)
- return result;
-
- // Init the Asynch_Accept_Handler now. It needs to keep Proactor
- // also with it.
- ACE_NEW_RETURN (this->accept_handler_,
- ACE_POSIX_SIG_Asynch_Accept_Handler (&this->reactor_,
- this->posix_proactor ()),
- -1);
-
- // Register the handle with the reactor.
- int return_val = this->reactor_.register_handler (this->handle_,
- this->accept_handler_,
- ACE_Event_Handler::ACCEPT_MASK);
- if (return_val == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:Reactor::register_handler failed\n"),
- -1);
-
- // Suspend the <handle> now. Enable only when the <accept> is issued
- // by the application.
- return_val = this->reactor_.suspend_handler (this->handle_);
- if (return_val == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:Reactor::suspend_handler failed\n"),
- -1);
-
- // Spawn the thread. It is the only thread we are going to have. It
- // will do the <handle_events> on the reactor.
- return_val = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_SIG_Asynch_Accept::thread_function,
- (void *)&this->reactor_);
- if (return_val == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:Thread_Manager::spawn failed\n"),
- -1);
-
- return 0;
-}
-
-ACE_POSIX_SIG_Asynch_Accept::~ACE_POSIX_SIG_Asynch_Accept (void)
-{
-}
-
-void*
-ACE_POSIX_SIG_Asynch_Accept::thread_function (void* arg_reactor)
-{
- // Retrieve the reactor pointer from the argument.
- ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *,
- arg_reactor);
- if (reactor == 0)
- reactor = ACE_Reactor::instance ();
-
- // For this reactor, this thread is the owner.
- reactor->owner (ACE_OS::thr_self ());
-
- // Handle events. Wait for any connection events.
- int result = 0;
- while (result != -1)
- result = reactor->handle_events ();
-
- return 0;
-}
-
-// Methods belong to ACE_POSIX_Asynch_Operation base class. These
-// methods are defined here to avoid dominance warnings. They route
-// the call to the ACE_POSIX_Asynch_Operation base class.
-
-int
-ACE_POSIX_SIG_Asynch_Accept::cancel (void)
-{
- return ACE_POSIX_Asynch_Operation::cancel ();
}
ACE_Proactor *
-ACE_POSIX_SIG_Asynch_Accept::proactor (void) const
+ACE_POSIX_Asynch_Accept::proactor (void) const
{
return ACE_POSIX_Asynch_Operation::proactor ();
}
diff --git a/ace/POSIX_Asynch_IO.h b/ace/POSIX_Asynch_IO.h
index beac02277b0..9ede6b5c37a 100644
--- a/ace/POSIX_Asynch_IO.h
+++ b/ace/POSIX_Asynch_IO.h
@@ -1,4 +1,4 @@
-// -*- C++ -*-
+/* -*- C++ -*- */
//=============================================================================
/**
@@ -98,6 +98,12 @@ public:
/// Destructor.
virtual ~ACE_POSIX_Asynch_Result (void);
+ /// Simulate error value to use in the post_completion ()
+ void set_error (u_long errcode);
+
+ /// Simulate value to use in the post_completion ()
+ void set_bytes_transferred (u_long nbytes);
+
protected:
/// Constructor. <Event> is not used on POSIX.
ACE_POSIX_Asynch_Result (ACE_Handler &handler,
@@ -607,7 +613,7 @@ public:
int priority,
int signal_number = 0);
- /// Destrcutor.
+ /// Destructor.
virtual ~ACE_POSIX_AIOCB_Asynch_Write_Stream (void);
// = Methods belong to ACE_POSIX_Asynch_Operation base class. These
@@ -1084,7 +1090,7 @@ public:
/**
* This starts off an asynchronous write. Upto <bytes_to_write>
- * will be write and stored in the <message_block>. The write will
+ * will be written and stored in the <message_block>. The write will
* start at <offset> from the beginning of the file.
*/
int write (ACE_Message_Block &message_block,
@@ -1157,7 +1163,7 @@ public:
/**
* This starts off an asynchronous write. Upto <bytes_to_write>
- * will be write and stored in the <message_block>. The write will
+ * will be written and stored in the <message_block>. The write will
* start at <offset> from the beginning of the file.
*/
int write (ACE_Message_Block &message_block,
@@ -1222,8 +1228,8 @@ class ACE_Export ACE_POSIX_Asynch_Accept_Result : public virtual ACE_Asynch_Acce
public ACE_POSIX_Asynch_Result
{
/// Factory classes willl have special permissions.
- friend class ACE_POSIX_AIOCB_Asynch_Accept;
- friend class ACE_POSIX_SIG_Asynch_Accept;
+ friend class ACE_POSIX_Asynch_Accept;
+ friend class ACE_POSIX_Asynch_Accept_Handler;
/// The Proactor constructs the Result class for faking results.
friend class ACE_POSIX_Proactor;
@@ -1330,23 +1336,24 @@ protected:
};
/**
- * @class ACE_POSIX_AIOCB_Asynch_Accept_Handler
+ * @class ACE_POSIX_Asynch_Accept_Handler
*
* Forward declaration. This class is defined the in the cpp file,
* since this is internal to the implementation.
*/
-class ACE_POSIX_AIOCB_Asynch_Accept_Handler;
+class ACE_POSIX_Asynch_Accept_Handler;
/**
- * @class ACE_POSIX_AIOCB_Asynch_Accept
+ * @class ACE_POSIX_Asynch_Accept
*
*/
-class ACE_Export ACE_POSIX_AIOCB_Asynch_Accept : public virtual ACE_Asynch_Accept_Impl,
- public ACE_POSIX_AIOCB_Asynch_Operation
+class ACE_Export ACE_POSIX_Asynch_Accept :
+ public virtual ACE_Asynch_Accept_Impl,
+ public ACE_POSIX_Asynch_Operation
{
public:
/// Constructor.
- ACE_POSIX_AIOCB_Asynch_Accept (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
+ ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor * posix_proactor);
/**
* This <open> belongs to ACE_AIOCB_Asynch_Operation. We forward
@@ -1377,15 +1384,28 @@ public:
int signal_number = 0);
/// Destructor.
- virtual ~ACE_POSIX_AIOCB_Asynch_Accept (void);
+ virtual ~ACE_POSIX_Asynch_Accept (void);
// = Methods belong to ACE_POSIX_Asynch_Operation base class. These
// methods are defined here to avoid dominace warnings. They route
// the call to the ACE_POSIX_Asynch_Operation base class.
- ///
- /// @@ Not implemented. Returns 0.
+ /**
+ * Cancel all pending pseudo-asynchronus requests
+ * Behavior as usual AIO request
+ */
int cancel (void);
+
+ /**
+ * Close performs cancellation of all pending requests
+ * Parameter flg_notify can be
+ * 0 - don't send notifications about canceled accepts
+ * 1 - notify user about canceled accepts
+ * according POSIX standards we should receive notifications
+ * on canceled AIO requests
+ */
+ int close ( int flg_notify);
+
/// Return the underlying proactor.
ACE_Proactor* proactor (void) const;
@@ -1398,83 +1418,16 @@ private:
ACE_Reactor reactor_;
/// The Event Handler to do handle_input.
- ACE_POSIX_AIOCB_Asynch_Accept_Handler* accept_handler_;
-};
-
-/**
- * @class ACE_POSIX_SIG_Asynch_Accept_Handler;
- *
- * Forward declaration. This class is defined the in the cpp file,
- * since this is internal to the implementation.
- */
-class ACE_POSIX_SIG_Asynch_Accept_Handler;
-
-/**
- * @class ACE_POSIX_SIG_Asynch_Accept
- *
- * @brief This class implements <ACE_Asynch_Accept> for Realtime
- * Signal (<sigtimedwait>) based implementation of Proactor.
- */
-class ACE_Export ACE_POSIX_SIG_Asynch_Accept : public virtual ACE_Asynch_Accept_Impl,
- public ACE_POSIX_SIG_Asynch_Operation
-{
-public:
- /// Constructor.
- ACE_POSIX_SIG_Asynch_Accept (ACE_POSIX_SIG_Proactor *posix_sig_proactor);
+ ACE_POSIX_Asynch_Accept_Handler* accept_handler_;
- /**
- * This <open> belongs to ACE_SIG_Asynch_Operation. We forward this
- * call to that method. We have put this here to avoid the compiler
- * warnings.
- */
- int open (ACE_Handler &handler,
- ACE_HANDLE handle,
- const void *completion_key,
- ACE_Proactor *proactor = 0);
+ /// group id for the thread that we create for accepts
+ int grp_id_ ;
- /**
- * This starts off an asynchronous accept. The asynchronous accept
- * call also allows any initial data to be returned to the
- * <handler>. Upto <bytes_to_read> will be read and stored in the
- * <message_block>. The <accept_handle> will be used for the
- * <accept> call. If (<accept_handle> == INVALID_HANDLE), a new
- * handle will be created.
- *
- * <message_block> must be specified. This is because the address of
- * the new connection is placed at the end of this buffer.
- */
- int accept (ACE_Message_Block &message_block,
- u_long bytes_to_read,
- ACE_HANDLE accept_handle,
- const void *act,
- int priority,
- int signal_number);
-
- /// Destructor.
- virtual ~ACE_POSIX_SIG_Asynch_Accept (void);
-
- // = Methods belong to ACE_POSIX_Asynch_Operation base class. These
- // methods are defined here to avoid dominace warnings. They route
- // the call to the ACE_POSIX_Asynch_Operation base class.
-
- ///
- /// @@ Not implemented. Returns 0.
- int cancel (void);
-
- /// Return the underlying proactor.
- ACE_Proactor* proactor (void) const;
-
-private:
- /// The thread function that does handle events.
- static void* thread_function (void* reactor);
-
- /// Reactor to wait on the <listen_handle>.
- ACE_Reactor reactor_;
-
- /// The Event Handler to do handle_input.
- ACE_POSIX_SIG_Asynch_Accept_Handler* accept_handler_;
+ /// POSIX Proactor implementation
+ ACE_POSIX_Proactor * posix_proactor_;
};
+
/**
* @class ACE_POSIX_Asynch_Transmit_File_Result
*
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 8634454e154..7a5aa21b73e 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -1,3 +1,4 @@
+/* -*- C++ -*- */
// $Id$
#define ACE_BUILD_DLL
@@ -421,6 +422,9 @@ ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Pr
// Open the pipe.
this->pipe_.open ();
+ // Let AIOCB_Proactor know about our handle
+ posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ());
+
// Open the read stream.
if (this->read_stream_.open (*this,
this->pipe_.read_handle (),
@@ -474,11 +478,12 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream:
asynch_result = *(ACE_POSIX_Asynch_Result **) result.message_block ().rd_ptr ();
// Do the upcall.
- this->posix_aiocb_proactor_->application_specific_code (asynch_result,
- 0, // No Bytes transferred.
- 1, // Success.
- 0, // Completion token.
- 0); // Error.
+ this->posix_aiocb_proactor_->application_specific_code
+ (asynch_result,
+ asynch_result->bytes_transferred(), // 0, No bytes transferred.
+ 1, // Result : True.
+ 0, // No completion key.
+ asynch_result->error()); //0, No error.
// Set the message block properly. Put the <wr_ptr> back in the
// initial position.
@@ -504,12 +509,13 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
aiocb_list_ (0),
result_list_ (0),
aiocb_list_max_size_ (max_aio_operations),
- aiocb_list_cur_size_ (0)
+ aiocb_list_cur_size_ (0),
+ notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
+ num_deferred_aiocb_ (0),
+ num_started_aio_(0)
{
- if (aiocb_list_max_size_ > 8192)
- // @@ Alex, this shouldn't be a magic number -- it should be a
- // constant, e.g., ACE_AIO_MAX_SIZE or something.
- aiocb_list_max_size_ = 8192;
+ //check for correct value for max_aio_operations
+ check_max_aio_num () ;
ACE_NEW (aiocb_list_,
aiocb *[aiocb_list_max_size_]);
@@ -532,14 +538,15 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,in
aiocb_list_ (0),
result_list_ (0),
aiocb_list_max_size_ (max_aio_operations),
- aiocb_list_cur_size_ (0)
+ aiocb_list_cur_size_ (0),
+ notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
+ num_deferred_aiocb_ (0),
+ num_started_aio_(0)
{
ACE_UNUSED_ARG (Flg);
- if (aiocb_list_max_size_ > 8192)
- // @@ Alex, this shouldn't be a magic number -- it should be a
- // constant, e.g., ACE_AIO_MAX_SIZE or something.
- aiocb_list_max_size_ = 8192;
+ //check for correct value for max_aio_operations
+ check_max_aio_num () ;
ACE_NEW (aiocb_list_,
aiocb *[aiocb_list_max_size_]);
@@ -562,13 +569,70 @@ ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
{
delete_notify_manager ();
+ // delete all uncomlpeted operarion
+ // as nobody will notify client since now
+ for (size_t ai = 0; ai < aiocb_list_max_size_; ai++)
+ {
+ delete result_list_[ai] ;
+ result_list_[ai] = 0;
+ aiocb_list_[ai] = 0;
+ }
+
+
delete [] aiocb_list_;
aiocb_list_ = 0;
-
+
delete [] result_list_;
result_list_ = 0;
}
+void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
+{
+ notify_pipe_read_handle_ = h ;
+}
+
+void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
+{
+ long max_os_aio_num = ACE_OS ::sysconf ( _SC_AIO_MAX );
+
+ // Define max limit AIO's for concrete OS
+ // -1 means that there is no limit, but it is not true
+ // ( example, SunOS 5.6)
+
+ if ( max_os_aio_num > 0
+ && aiocb_list_max_size_ > ( unsigned long ) max_os_aio_num
+ )
+ aiocb_list_max_size_ = max_os_aio_num ;
+
+#if defined(HPUX)
+
+ // Although HPUX 11.00 allows to start 2048 AIO's
+ // for all process in system
+ // it has a limit 256 max elements for aio_suspend ()
+ // It is a pity, but ...
+
+ long max_os_listio_num = ACE_OS ::sysconf ( _SC_AIO_LISTIO_MAX );
+ if ( max_os_listio_num > 0
+ && aiocb_list_max_size_ > ( unsigned long ) max_os_listio_num
+ )
+ aiocb_list_max_size_ = max_os_listio_num ;
+
+#endif
+
+ // The last check for user-defined value
+ // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h
+
+ if ( aiocb_list_max_size_ <= 0
+ || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE
+ )
+ aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
+ aiocb_list_max_size_));
+
+}
+
void
ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
{
@@ -656,8 +720,10 @@ ACE_POSIX_AIOCB_Proactor::create_asynch_accept (void)
{
ACE_Asynch_Accept_Impl *implementation = 0;
ACE_NEW_RETURN (implementation,
- ACE_POSIX_AIOCB_Asynch_Accept (this),
+ ACE_POSIX_Asynch_Accept (this),
0);
+ //was ACE_POSIX_AIOCB_Asynch_Accept (this)
+
return implementation;
}
@@ -705,43 +771,56 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds)
0); // let continue work
}
+ size_t index = 0;
int error_status = 0;
int return_status = 0;
- ACE_POSIX_Asynch_Result *asynch_result =
- find_completed_aio (error_status, return_status);
+ int retval= 0;
- if (asynch_result == 0)
- return 0;
+ for ( ; ; )
+ {
+ ACE_POSIX_Asynch_Result *asynch_result =
+ find_completed_aio (error_status, return_status,index);
+
+ if (asynch_result == 0)
+ break;
+
+ //at least one processed
+ retval = 1 ; // more informative retval++
- // Call the application code.
- this->application_specific_code (asynch_result,
+ // Call the application code.
+ this->application_specific_code (asynch_result,
return_status, // Bytes transferred.
1, // Success
0, // No completion key.
error_status); // Error
- return 1;
+ }
+
+ return retval;
}
ACE_POSIX_Asynch_Result *
ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
- int &return_status)
+ int &return_status,
+ size_t &index )
{
ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
-
- size_t ai;
+
ACE_POSIX_Asynch_Result *asynch_result = 0;
error_status = 0;
return_status= 0;
-
- for (ai = 0; ai < aiocb_list_max_size_; ai++)
+
+ if ( num_started_aio_ == 0 ) // save time
+ return asynch_result;
+
+ for (; index < aiocb_list_max_size_; index++)
{
- if (aiocb_list_[ai] == 0) // Dont process null blocks.
+ if (aiocb_list_[index] == 0) // Dont process null blocks.
continue;
// Get the error status of the aio_ operation.
- error_status = aio_error (aiocb_list_[ai]);
+ error_status = aio_error (aiocb_list_[index]);
if (error_status == -1) // <aio_error> itself has failed.
{
@@ -750,12 +829,16 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
"ACE_POSIX_AIOCB_Proactor::find_completed_aio:"
"<aio_error> has failed\n"));
- // skip this operation
- aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
- aiocb_list_cur_size_--;
-
- continue;
+ break;
+
+ // we should notify user, otherwise :
+ // memory leak for result and "hanging" user
+ // what was before skip this operation
+
+ //aiocb_list_[index] = 0;
+ //result_list_[index] = 0;
+ //aiocb_list_cur_size_--;
+ //continue;
}
// Continue the loop if <aio_> operation is still in progress.
@@ -764,29 +847,40 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
} // end for
- if (ai >= this->aiocb_list_max_size_) // all processed
+ if (index >= this->aiocb_list_max_size_) // all processed
return asynch_result;
else if (error_status == ECANCELED)
return_status = 0;
- else
- return_status = aio_return (aiocb_list_[ai]);
-
+ else if (error_status == -1)
+ return_status = 0;
+ else
+ return_status = aio_return (aiocb_list_[index]);
+
if (return_status == -1)
{
- // was ACE_ERROR_RETURN
- ACE_ERROR ((LM_ERROR,
+ return_status = 0; // zero bytes transferred
+
+ if (error_status == 0) // nonsense
+ ACE_ERROR ((LM_ERROR,
"%N:%l:(%P | %t)::%p\n",
"ACE_POSIX_AIOCB_Proactor::find_completed_aio:"
"<aio_return> failed\n"));
- return_status = 0; // zero bytes transferred
}
- asynch_result = result_list_[ai];
+
+ asynch_result = result_list_[index];
- aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
+ aiocb_list_[index] = 0;
+ result_list_[index] = 0;
aiocb_list_cur_size_--;
+ num_started_aio_--; // decrement count active aios
+ index++ ; // for next iteration
+
+ this->start_deferred_aio ();
+ //make attempt to start deferred AIO
+ //It is safe as we are protected by mutex_
+
return asynch_result;
}
@@ -811,42 +905,107 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio
ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::register_and_start_aio");
ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
-
+
int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
if (result == 0) // Just check the status of the list
return ret_val;
- // Non-zero ptr. Find a free slot and store.
- if (ret_val == 0)
+ // Save operation code in the aiocb
+ switch ( op )
+ {
+ case 0 :
+ result->aio_lio_opcode = LIO_READ;
+ break;
+
+ case 1 :
+ result->aio_lio_opcode = LIO_WRITE;
+ break;
+
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::\n"
+ "register_and_start_aio: Invalid operation code\n"),
+ -1);
+ }
+
+ if (ret_val != 0) // No free slot
+ {
+ errno = EAGAIN;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::\n"
+ "register_and_start_aio: "
+ "No space to store the <aio>info\n"),
+ -1);
+ }
+
+ // Find a free slot and store.
+ // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager
+ // so make check for ACE_AIOCB_Notify_Pipe_Manager request
+
+ size_t i = 0;
+
+ if ( notify_pipe_read_handle_ == result->aio_fildes ) // Notify_Pipe ?
+ { // should be free,
+ if ( result_list_[i] != 0 ) // only 1 request
+ { // is allowed
+ errno = EAGAIN;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::\n"
+ "register_and_start_aio:"
+ "internal Proactor error 0\n"),
+ -1 );
+ }
+ }
+ else //try to find free slot as usual, but starting from 1
{
- for (size_t i= 0; i < this->aiocb_list_max_size_; i++)
- if (aiocb_list_[i] == 0)
- {
- ret_val = start_aio (result, op);
-
- if (ret_val == 0) // Store the pointers.
- {
- aiocb_list_[i] = result;
- result_list_[i] = result;
-
- aiocb_list_cur_size_++;
- }
- return ret_val;
- }
-
- errno = EAGAIN;
- ret_val = -1;
+ for ( i= 1; i < this->aiocb_list_max_size_; i++)
+ if (result_list_[i] == 0)
+ break ;
}
-
- ACE_ERROR ((LM_ERROR,
+
+ if ( i >= this->aiocb_list_max_size_ )
+ ACE_ERROR_RETURN ((LM_ERROR,
"%N:%l:(%P | %t)::\n"
- "register_and_start_aio: No space to store the <aio>info\n"));
- return ret_val;
+ "register_and_start_aio: "
+ "internal Proactor error 1\n"),
+ -1);
+
+ result_list_[i] = result; //Store result ptr anyway
+ aiocb_list_cur_size_++;
+
+ ret_val = start_aio (result);
+
+ switch ( ret_val )
+ {
+ case 0 : // started OK
+ aiocb_list_[i] = result;
+ return 0 ;
+
+ case 1 : //OS AIO queue overflow
+ num_deferred_aiocb_ ++ ;
+ return 0 ;
+
+ default: //Invalid request, there is no point
+ break; // to start it later
+ }
+
+ result_list_[i] = 0;
+ aiocb_list_cur_size_--;
+
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "register_and_start_aio: Invalid request to start <aio>\n"));
+ return -1;
}
+// start_aio has new return codes
+// 0 AIO was started successfully
+// 1 AIO was not started, OS AIO queue overflow
+// -1 AIO was not started, other errors
+
int
-ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, int op)
+ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result)
{
ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
@@ -855,13 +1014,13 @@ ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, int op)
// Start IO
- switch (op)
+ switch (result->aio_lio_opcode )
{
- case 0:
+ case LIO_READ :
ptype = "read ";
ret_val = aio_read (result);
break;
- case 1:
+ case LIO_WRITE :
ptype = "write";
ret_val = aio_write (result);
break;
@@ -870,16 +1029,177 @@ ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, int op)
ret_val = -1;
break;
}
-
- if (ret_val == -1)
- ACE_ERROR ((LM_ERROR,
+
+ if (ret_val == 0 )
+ num_started_aio_ ++ ;
+ else // if (ret_val == -1)
+ {
+ if ( errno == EAGAIN ) //Ok, it will be deferred AIO
+ ret_val = 1 ;
+ else
+ ACE_ERROR ((LM_ERROR,
"%N:%l:(%P | %t)::start_aio: aio_%s %p\n",
ptype,
"queueing failed\n"));
+ }
return ret_val;
}
+
+int
+ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
+{
+ ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
+
+ // This protected method is called from
+ // find_completed_aio after any AIO completion
+ // We should call this method always with locked
+ // ACE_POSIX_AIOCB_Proactor::mutex_
+ //
+ // It tries to start the first deferred AIO
+ // if such exists
+
+ if ( num_deferred_aiocb_ == 0 )
+ return 0 ; // nothing to do
+
+ size_t i = 0;
+
+ for ( i= 0; i < this->aiocb_list_max_size_; i++)
+ if ( result_list_[i] !=0 // check for
+ && aiocb_list_[i] ==0 ) // deferred AIO
+ break ;
+
+ if ( i >= this->aiocb_list_max_size_ )
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::\n"
+ "start_deferred_aio:"
+ "internal Proactor error 3\n"),
+ -1);
+
+ ACE_POSIX_Asynch_Result *result = result_list_[i];
+
+ int ret_val = start_aio (result);
+
+ switch ( ret_val )
+ {
+ case 0 : //started OK , decrement count of deferred AIOs
+ aiocb_list_[i] = result;
+ num_deferred_aiocb_ -- ;
+ return 0 ;
+
+ case 1 :
+ return 0 ; //try again later
+
+ default : // Invalid Parameters , should never be
+ break ;
+ }
+
+ //AL notify user
+
+ result_list_[i] = 0;
+ aiocb_list_cur_size_--;
+
+ num_deferred_aiocb_ -- ;
+
+ result->set_error (errno);
+ result->set_bytes_transferred (0);
+ this->post_completion ( result );
+
+ return -1;
+}
+
+int
+ACE_POSIX_AIOCB_Proactor::cancel_aio ( ACE_HANDLE handle )
+{
+ // This new method should be called from
+ // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel
+ // It scans the result_list_ and defines all AIO requests
+ // that were issued for handle "handle"
+ //
+ // For all deferred AIO requests with handle "handle"
+ // it removes its from the lists and notifies user
+ //
+ // For all running AIO requests with handle "handle"
+ // it calls ::aio_cancel. According to the POSIX standards
+ // we will receive ECANCELED for all ::aio_canceled AIO requests
+ // later on return from ::aio_suspend
+
+ ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
+
+ int num_total = 0;
+ int num_cancelled = 0;
+ size_t ai = 0;
+
+ for (ai = 0; ai < aiocb_list_max_size_; ai++)
+ {
+ if ( result_list_[ai] == 0 ) //skip empty slot
+ continue ;
+
+ if ( result_list_[ai]->aio_fildes != handle ) //skip not our slot
+ continue ;
+
+ num_total++ ;
+
+ ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai];
+
+ if ( aiocb_list_ [ai] == 0 ) //deferred aio
+ {
+ num_cancelled ++ ;
+ num_deferred_aiocb_ -- ;
+
+ aiocb_list_[ai] = 0;
+ result_list_[ai] = 0;
+ aiocb_list_cur_size_--;
+
+ asynch_result->set_error (ECANCELED);
+ asynch_result->set_bytes_transferred (0);
+ this->post_completion ( asynch_result );
+ }
+ else //cancel started aio
+ {
+ int rc_cancel = this->cancel_aiocb (asynch_result );
+
+ if ( rc_cancel == 0 ) //notification in the future
+ num_cancelled ++ ; //it is OS responsiblity
+ }
+ }
+
+ if ( num_total == 0 )
+ return 1; // ALLDONE
+
+ if ( num_cancelled == num_total )
+ return 0; // CANCELLED
+
+ return 2; // NOT CANCELLED
+}
+
+int
+ACE_POSIX_AIOCB_Proactor::cancel_aiocb ( ACE_POSIX_Asynch_Result * result )
+{
+ // This new method is called from cancel_aio
+ // to cancel concrete running AIO request
+ int rc = ::aio_cancel (0, result );
+
+ // Check the return value and return 0/1/2 appropriately.
+ if (rc == AIO_CANCELED)
+ return 0;
+ else if (rc == AIO_ALLDONE)
+ return 1;
+ else if (rc == AIO_NOTCANCELED)
+ return 2;
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "cancel_aiocb:"
+ "Unexpected result from <aio_cancel>"),
+ -1);
+
+}
+
+
// *********************************************************************
ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void)
@@ -1048,8 +1368,10 @@ ACE_POSIX_SIG_Proactor::create_asynch_accept (void)
{
ACE_Asynch_Accept_Impl *implementation = 0;
ACE_NEW_RETURN (implementation,
- ACE_POSIX_SIG_Asynch_Accept (this),
+ ACE_POSIX_Asynch_Accept (this),
0);
+
+ // was ACE_POSIX_SIG_Asynch_Accept (this),
return implementation;
}
@@ -1274,7 +1596,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
// Failure.
if (return_status == -1)
- {
+ {
ACE_ERROR ((LM_ERROR,
"%N:%l:(%P | %t)::%p\n",
"ACE_POSIX_SIG_Proactor::handle_events:"
@@ -1294,11 +1616,12 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
}
else if (sig_info.si_code == SI_QUEUE)
{
- this->application_specific_code (asynch_result,
- 0, // No bytes transferred.
- 1, // Result : True.
- 0, // No completion key.
- 0); // No error.
+ this->application_specific_code
+ (asynch_result,
+ asynch_result->bytes_transferred(), // 0, No bytes transferred.
+ 1, // Result : True.
+ 0, // No completion key.
+ asynch_result->error()); //0, No error.
}
else
// Unknown signal code.
diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h
index d8f61a25b0f..836c97e3459 100644
--- a/ace/POSIX_Proactor.h
+++ b/ace/POSIX_Proactor.h
@@ -26,6 +26,9 @@
#include "ace/Pipe.h"
#include "ace/POSIX_Asynch_IO.h"
+#define ACE_AIO_MAX_SIZE 2048
+#define ACE_AIO_DEFAULT_SIZE 1024
+
/**
* @class ACE_POSIX_Proactor
*
@@ -234,7 +237,7 @@ class ACE_Export ACE_POSIX_AIOCB_Proactor : public ACE_POSIX_Proactor
public:
/// Constructor defines max number asynchronous operations
/// which can be started at the same time
- ACE_POSIX_AIOCB_Proactor (size_t nmaxop = 256) ; //ACE_RTSIG_MAX
+ ACE_POSIX_AIOCB_Proactor (size_t nmaxop = ACE_AIO_DEFAULT_SIZE);
virtual Proactor_Type get_impl_type (void);
@@ -276,6 +279,19 @@ public:
virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void);
+ /**
+ * This method should be called from
+ * ACE_POSIX_Asynch_Operation::cancel()
+ * instead of usual ::aio_cancel.
+ * For all deferred AIO requests with handle "h"
+ * it removes its from the lists and notifies user.
+ * For all running AIO requests with handle "h"
+ * it calls ::aio_cancel. According to the POSIX standards
+ * we will receive ECANCELED for all ::aio_canceled AIO requests
+ * later on return from ::aio_suspend
+ */
+ virtual int cancel_aio (ACE_HANDLE h);
+
protected:
/// Special constructor for ACE_SUN_Proactor
@@ -286,6 +302,15 @@ protected:
void create_notify_manager (void);
void delete_notify_manager (void);
+ /// Define the maximum number of asynchronous I/O requests
+ /// for the current OS
+ void check_max_aio_num (void) ;
+
+ /// To identify requests from Notify_Pipe_Manager
+ void set_notify_handle (ACE_HANDLE h);
+
+
+
/**
* Dispatch a single set of events. If <milli_seconds> elapses
* before any events occur, return 0. Return 1 if a completion
@@ -303,12 +328,20 @@ protected:
virtual int register_and_start_aio (ACE_POSIX_Asynch_Result *result,
int op);
- virtual int start_aio (ACE_POSIX_Asynch_Result *result,
- int op);
+
+ /// Op code now is saved in ACE_POSIX_Asynch_Result
+ virtual int start_aio (ACE_POSIX_Asynch_Result *result);
+
+ /// Start deferred AIO if necessary
+ int start_deferred_aio();
+
+ /// Cancel running or deferred AIO
+ virtual int cancel_aiocb ( ACE_POSIX_Asynch_Result * result );
/// Extract the results of aio.
ACE_POSIX_Asynch_Result *find_completed_aio (int &error_status,
- int &return_status);
+ int &return_status,
+ size_t &index );
/// This class takes care of doing <accept> when we use
/// AIO_CONTROL_BLOCKS strategy.
@@ -329,6 +362,18 @@ protected:
/// Mutex to protect work with lists.
ACE_Thread_Mutex mutex_;
#endif /* ACE_MT_SAFE */
+
+ /// The purpose of this member is only to identify asynchronous request
+ /// from NotifyManager. We will reserve for it always slot 0
+ /// in the list of aiocb's to be sure that don't lose notifications.
+ ACE_HANDLE notify_pipe_read_handle_ ;
+
+ /// number of ACE_POSIX_Asynch_Result's waiting for start
+ /// i.e. deferred AIOs
+ size_t num_deferred_aiocb_ ;
+
+ /// Number active,i.e. running requests
+ size_t num_started_aio_ ;
};
/**
diff --git a/ace/SUN_Proactor.cpp b/ace/SUN_Proactor.cpp
index 33c0f266fcd..f341bce521d 100644
--- a/ace/SUN_Proactor.cpp
+++ b/ace/SUN_Proactor.cpp
@@ -1,3 +1,4 @@
+/* -*- C++ -*- */
// $Id$
#include "ace/SUN_Proactor.h"
@@ -13,7 +14,8 @@
#endif /* __ACE_INLINE__ */
ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations)
- : ACE_POSIX_AIOCB_Proactor (max_aio_operations , 0)
+ : ACE_POSIX_AIOCB_Proactor (max_aio_operations , 0),
+ condition_ (mutex_)
{
// To provide correct virtual calls.
create_notify_manager ();
@@ -29,6 +31,8 @@ ACE_SUN_Proactor::~ACE_SUN_Proactor (void)
int
ACE_SUN_Proactor::handle_events (ACE_Time_Value &wait_time)
{
+ // Decrement <wait_time> with the amount of time spent in the method
+ ACE_Countdown_Time countdown (&wait_time);
return this->handle_events (wait_time.msec ());
}
@@ -38,31 +42,73 @@ ACE_SUN_Proactor::handle_events (void)
return this->handle_events (ACE_INFINITE);
}
+int ACE_SUN_Proactor::wait_for_start (ACE_Time_Value * abstime)
+{
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, -1));
+
+ if (num_started_aio_ != 0) // double check
+ return 0;
+
+ return condition_.wait (abstime) ;
+
+#else
+
+ return 0; // or -1 ???
+
+#endif /* ACE_MT_SAFE */
+}
+
int
ACE_SUN_Proactor::handle_events (u_long milli_seconds)
{
aio_result_t *result = 0;
if (milli_seconds == ACE_INFINITE)
- result = aiowait (0);
+ {
+ if (num_started_aio_ == 0)
+ wait_for_start (0);
+
+ result = aiowait (0);
+ }
else
{
struct timeval timeout;
timeout.tv_sec = milli_seconds / 1000;
timeout.tv_usec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000;
+
+ if (num_started_aio_ == 0)
+ {
+ ACE_Time_Value tv (timeout);
+
+ tv += ACE_OS::gettimeofday ();
+
+ wait_for_start (&tv);
+ }
result = aiowait (&timeout);
}
+ if (ACE_reinterpret_cast (long, result) == 0)
+ return 0; // timeout
+
if (ACE_reinterpret_cast (long, result) == -1)
+ {
// Check errno for EINVAL,EAGAIN,EINTR ??
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_SUN_Proactor::handle_events:"
- "aiowait failed"),
- 0);
-
- if (ACE_reinterpret_cast (long, result) == -1)
- return 0; // timeout
+ switch (errno)
+ {
+ case EINTR : // aiowait() was interrupted by a signal.
+ case EINVAL: //There are no outstanding asynchronous I/O requests.
+ return 0;
+
+ default: // EFAULT
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p \nNumAIO=%d\n",
+ "ACE_SUN_Proactor::handle_events: aiowait failed",
+ num_started_aio_),
+ -1);
+ }
+ }
int error_status = 0;
int return_status = 0;
@@ -96,16 +142,17 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
return_status= 0;
// we call find_completed_aio always with result != 0
-
+
for (ai = 0; ai < aiocb_list_max_size_; ai++)
- if (result == &aiocb_list_[ai]->aio_resultp)
+ if (aiocb_list_[ai] !=0 && //check for non zero
+ result == &aiocb_list_[ai]->aio_resultp)
break;
-
+
if (ai >= aiocb_list_max_size_) // not found
return 0;
error_status = result->aio_errno;
- return_status= result->aio_return;
+ return_status= result->aio_return;
if (error_status == -1) // should never be
{
@@ -114,27 +161,44 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
"ACE_SUN_Proactor::find_completed_aio:"
"<aio_errno> has failed\n"));
- aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
- aiocb_list_cur_size_--;
-
- return 0;
- }
+ return_status = 0;
- if (error_status == EINPROGRESS) // should never be
- return 0;
+ // we should notify user, otherwise :
+ // memory leak for result and "hanging" user
+ // what was before :
- if (error_status == ECANCELED)
- return_status = 0;
+ // aiocb_list_[ai] = 0;
+ // result_list_[ai] = 0;
+ // aiocb_list_cur_size_--;
+ // return 0;
+ }
- if (return_status == -1)
+ switch (error_status)
{
- // was ACE_ERROR_RETURN
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_SUN_Proactor::find_completed_aio:"
- "<aio_return> failed\n"));
- return_status = 0; // zero bytes transferred
+ case EINPROGRESS : // should never be
+ case AIO_INPROGRESS : // according to SUN doc
+ return 0;
+
+ case ECANCELED : // canceled
+ return_status = 0;
+ break;
+
+ case 0 : // no error
+ if (return_status == -1) // return_status should be >= 0
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_SUN_Proactor::find_completed_aio:"
+ "<aio_return> failed\n"));
+
+ return_status = 0; // zero bytes transferred
+ }
+ break;
+
+ default : // other errors
+ if (return_status == -1) // normal status for I/O Error
+ return_status = 0; // zero bytes transferred
+ break;
}
ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai];
@@ -143,12 +207,22 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
result_list_[ai] = 0;
aiocb_list_cur_size_--;
+ num_started_aio_ --;
+
+ start_deferred_aio ();
+ //make attempt to start deferred AIO
+ //It is safe as we are protected by mutex_
+
return asynch_result;
}
+// start_aio has new return codes
+// 0 successful start
+// 1 try later, OS queue overflow
+// -1 invalid request and other errors
+
int
-ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
- int op)
+ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result)
{
ACE_TRACE ("ACE_SUN_Proactor::start_aio");
@@ -157,9 +231,9 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
// Start IO
- switch (op)
+ switch (result->aio_lio_opcode)
{
- case 0 :
+ case LIO_READ :
ptype = "read";
ret_val = aioread (result->aio_fildes,
(char *) result->aio_buf,
@@ -169,7 +243,7 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
&result->aio_resultp);
break;
- case 1 :
+ case LIO_WRITE :
ptype = "write";
ret_val = aiowrite (result->aio_fildes,
(char *) result->aio_buf,
@@ -184,14 +258,62 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
ret_val = -1;
break;
}
-
- if (ret_val == -1)
- ACE_ERROR ((LM_ERROR,
+
+ if (ret_val == 0)
+ {
+ num_started_aio_ ++ ;
+ if (num_started_aio_ == 1) // wake up condition
+ condition_.broadcast ();
+ }
+ else // if (ret_val == -1)
+ {
+ if (errno == EAGAIN) //try later, it will be deferred AIO
+ ret_val = 1 ;
+ else
+ ACE_ERROR ((LM_ERROR,
"%N:%l:(%P | %t)::start_aio: aio%s %p\n",
ptype,
"queueing failed\n"));
+ }
+
return ret_val;
}
-#endif /* ACE_HAS_AIO_CALLS && sun */
+int
+ACE_SUN_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result)
+{
+ ACE_UNUSED_ARG (result);
+ return 2 ; // not implemented
+
+// AL
+// I tried to implement the following code
+// But result was : aiocancel returned -1 with errno=ACCESS_DENIED
+// moreover, later this operation had been never finished
+// on aiowait .
+// Is it Sun error ??
+//
+// So with SUN_Proactor there is only one way to cancel AIO
+// just close the file handle.
+//
+//
+// int rc = ::aiocancel (& result->aio_resultp);
+//
+// Check the return value and return 0/1/2 appropriately.
+// if (rc == 0) // AIO_CANCELED
+// return 0;
+//
+// ACE_ERROR_RETURN ((LM_ERROR,
+// "%N:%l:(%P | %t)::%p\n",
+// "cancel_aiocb:"
+// "Unexpected result from <aiocancel>"),
+// -1);
+}
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Condition<ACE_Thread_Mutex>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+// These are only instantiated with ACE_HAS_THREADS.
+#pragma instantiate ACE_Condition<ACE_Thread_Mutex>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
+#endif /* ACE_HAS_AIO_CALLS && sun */
diff --git a/ace/SUN_Proactor.h b/ace/SUN_Proactor.h
index 5bb8feb08b9..78d2d06ee02 100644
--- a/ace/SUN_Proactor.h
+++ b/ace/SUN_Proactor.h
@@ -64,11 +64,9 @@ public:
/// Destructor.
virtual ~ACE_SUN_Proactor (void);
- // @@ Alex, this shouldn't be a magic number, i.e., it should be a
- // constant, such as ACE_MAX_AIO_OPERATIONS.
/// Constructor defines max number asynchronous operations that can
/// be started at the same time.
- ACE_SUN_Proactor (size_t max_aio_operations = 512);
+ ACE_SUN_Proactor (size_t max_aio_operations = ACE_AIO_DEFAULT_SIZE);
protected:
/**
@@ -96,12 +94,25 @@ protected:
virtual int handle_events (void);
/// From ACE_POSIX_AIOCB_Proactor.
- virtual int start_aio (ACE_POSIX_Asynch_Result *result, int op);
+ virtual int start_aio (ACE_POSIX_Asynch_Result *result);
/// Extract the results of aio.
ACE_POSIX_Asynch_Result *find_completed_aio (aio_result_t *result,
int &error_status,
int &return_status);
+
+ /// From ACE_POSIX_AIOCB_Proactor.
+ /// Attempt to cancel running request
+ virtual int cancel_aiocb ( ACE_POSIX_Asynch_Result * result );
+
+ /// Specific Sun aiowait
+ int wait_for_start (ACE_Time_Value * abstime);
+
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+ /// Condition variable .
+ /// used to wait the first AIO start
+ ACE_Condition<ACE_Thread_Mutex> condition_;
+#endif /* ACE_MT_SAFE */
};
#if defined (__ACE_INLINE__)