diff options
-rw-r--r-- | ChangeLog | 46 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 46 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 46 | ||||
-rw-r--r-- | THANKS | 3 | ||||
-rw-r--r-- | ace/ACE.cpp | 62 | ||||
-rw-r--r-- | ace/ACE.h | 10 | ||||
-rw-r--r-- | ace/Asynch_Acceptor.cpp | 13 | ||||
-rw-r--r-- | ace/Asynch_Acceptor.h | 10 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.cpp | 532 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.h | 129 | ||||
-rw-r--r-- | ace/POSIX_Proactor.cpp | 493 | ||||
-rw-r--r-- | ace/POSIX_Proactor.h | 53 | ||||
-rw-r--r-- | ace/SUN_Proactor.cpp | 202 | ||||
-rw-r--r-- | ace/SUN_Proactor.h | 19 |
14 files changed, 1006 insertions, 658 deletions
diff --git a/ChangeLog b/ChangeLog index efba375ef71..3f69f38b430 100644 --- a/ChangeLog +++ b/ChangeLog @@ -9,6 +9,21 @@ Tue Jan 2 10:25:30 2001 Balachandran Natarajan <bala@cs.wustl.edu> Edan Ayal <edan@bandwiz.com> for ACE_Bounded_Cached_Connect_Strategy::find_or_create_svc_handler_i (). +Tue Jan 2 09:03:18 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + + * ace/Asynch_Acceptor.{h,cpp}, + ace/POSIX_Async_IO.{h,cpp}, + ace/POSIX_Proactor.{h,cpp}, + ace/SUN_Proactor.{h,cpp}: Added enhanced versions of ACE_POSIX_Proactor + and ACE_SUN_Proactor to fix various problems uncovered and fixed by + Alexander Libman <Alibman@baltimore.com>. + + * ace/ACE.{h,cpp}: Fixed the timestamp() method so that it works + identically on Win32 and on UNIX. Also provided a new option + that'll make it possible to return a pointer to the beginning of + the time portion of "date and time." Thanks to Michael Searles + <msearles@base16.com> for contributing these fixes. + Mon Jan 01 16:35:00 2000 Michael Kircher <Micahel.Kircher@mchp.siemens.de> * tests/Reader_Writer_Test.cpp: Added a #ifdef to check for native @@ -219,28 +234,7 @@ Thu Dec 21 16:45:28 2000 Steve Huston <shuston@riverace.com> removed to satisfy Win32 which now pays attention to that arg. Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> -<<<<<<< ChangeLog -======= - - * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where - next_earliest_time should have been next_earliest. Thanks to - Keith Brown <kalbrown@ix.netcom.com> for reporting this. - - * examples/IPC_SAP/SOCK_SAP/Makefile.bor: Reenable CPP-inserver-fancy.cpp - now that it compiles! Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for confirming this. - - * examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp (handle_events): Added - casts for (SOCKET) when using FD_SET to work around problems - with Borland. Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for reporting this. - * ace/Message_Block.h (ACE_Dynamic_Message_Strategy): Fixed a bunch - of non-const accessor methods to be const. Thanks to Johnny - Willemsen <johnny.willemsen@meco.nl> for reporting this. ->>>>>>> 4.1393 - -<<<<<<< ChangeLog * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where next_earliest_time should have been next_earliest. Thanks to Keith Brown <kalbrown@ix.netcom.com> for reporting this. @@ -266,16 +260,6 @@ Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> broadcast() don't release the mutex. Thanks to Mike Curtis <mccurry@my-deja.com> for pointing this out. -======= - * examples/Naming/Makefile (BIN2): Added the test_open.cpp file - to the Makefile. Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for reporting this. - - * docs/tutorials/016/page02.html: Clarify that signal() or - broadcast() don't release the mutex. Thanks to Mike Curtis - <mccurry@my-deja.com> for pointing this out. - ->>>>>>> 4.1393 Wed Dec 20 19:44:16 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> * examples/Reactor/WFMO_Reactor/test_abandoned.cpp, diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index efba375ef71..3f69f38b430 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -9,6 +9,21 @@ Tue Jan 2 10:25:30 2001 Balachandran Natarajan <bala@cs.wustl.edu> Edan Ayal <edan@bandwiz.com> for ACE_Bounded_Cached_Connect_Strategy::find_or_create_svc_handler_i (). +Tue Jan 2 09:03:18 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + + * ace/Asynch_Acceptor.{h,cpp}, + ace/POSIX_Async_IO.{h,cpp}, + ace/POSIX_Proactor.{h,cpp}, + ace/SUN_Proactor.{h,cpp}: Added enhanced versions of ACE_POSIX_Proactor + and ACE_SUN_Proactor to fix various problems uncovered and fixed by + Alexander Libman <Alibman@baltimore.com>. + + * ace/ACE.{h,cpp}: Fixed the timestamp() method so that it works + identically on Win32 and on UNIX. Also provided a new option + that'll make it possible to return a pointer to the beginning of + the time portion of "date and time." Thanks to Michael Searles + <msearles@base16.com> for contributing these fixes. + Mon Jan 01 16:35:00 2000 Michael Kircher <Micahel.Kircher@mchp.siemens.de> * tests/Reader_Writer_Test.cpp: Added a #ifdef to check for native @@ -219,28 +234,7 @@ Thu Dec 21 16:45:28 2000 Steve Huston <shuston@riverace.com> removed to satisfy Win32 which now pays attention to that arg. Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> -<<<<<<< ChangeLog -======= - - * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where - next_earliest_time should have been next_earliest. Thanks to - Keith Brown <kalbrown@ix.netcom.com> for reporting this. - - * examples/IPC_SAP/SOCK_SAP/Makefile.bor: Reenable CPP-inserver-fancy.cpp - now that it compiles! Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for confirming this. - - * examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp (handle_events): Added - casts for (SOCKET) when using FD_SET to work around problems - with Borland. Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for reporting this. - * ace/Message_Block.h (ACE_Dynamic_Message_Strategy): Fixed a bunch - of non-const accessor methods to be const. Thanks to Johnny - Willemsen <johnny.willemsen@meco.nl> for reporting this. ->>>>>>> 4.1393 - -<<<<<<< ChangeLog * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where next_earliest_time should have been next_earliest. Thanks to Keith Brown <kalbrown@ix.netcom.com> for reporting this. @@ -266,16 +260,6 @@ Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> broadcast() don't release the mutex. Thanks to Mike Curtis <mccurry@my-deja.com> for pointing this out. -======= - * examples/Naming/Makefile (BIN2): Added the test_open.cpp file - to the Makefile. Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for reporting this. - - * docs/tutorials/016/page02.html: Clarify that signal() or - broadcast() don't release the mutex. Thanks to Mike Curtis - <mccurry@my-deja.com> for pointing this out. - ->>>>>>> 4.1393 Wed Dec 20 19:44:16 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> * examples/Reactor/WFMO_Reactor/test_abandoned.cpp, diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index efba375ef71..3f69f38b430 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -9,6 +9,21 @@ Tue Jan 2 10:25:30 2001 Balachandran Natarajan <bala@cs.wustl.edu> Edan Ayal <edan@bandwiz.com> for ACE_Bounded_Cached_Connect_Strategy::find_or_create_svc_handler_i (). +Tue Jan 2 09:03:18 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + + * ace/Asynch_Acceptor.{h,cpp}, + ace/POSIX_Async_IO.{h,cpp}, + ace/POSIX_Proactor.{h,cpp}, + ace/SUN_Proactor.{h,cpp}: Added enhanced versions of ACE_POSIX_Proactor + and ACE_SUN_Proactor to fix various problems uncovered and fixed by + Alexander Libman <Alibman@baltimore.com>. + + * ace/ACE.{h,cpp}: Fixed the timestamp() method so that it works + identically on Win32 and on UNIX. Also provided a new option + that'll make it possible to return a pointer to the beginning of + the time portion of "date and time." Thanks to Michael Searles + <msearles@base16.com> for contributing these fixes. + Mon Jan 01 16:35:00 2000 Michael Kircher <Micahel.Kircher@mchp.siemens.de> * tests/Reader_Writer_Test.cpp: Added a #ifdef to check for native @@ -219,28 +234,7 @@ Thu Dec 21 16:45:28 2000 Steve Huston <shuston@riverace.com> removed to satisfy Win32 which now pays attention to that arg. Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> -<<<<<<< ChangeLog -======= - - * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where - next_earliest_time should have been next_earliest. Thanks to - Keith Brown <kalbrown@ix.netcom.com> for reporting this. - - * examples/IPC_SAP/SOCK_SAP/Makefile.bor: Reenable CPP-inserver-fancy.cpp - now that it compiles! Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for confirming this. - - * examples/IPC_SAP/SOCK_SAP/CPP-inserver-fancy.cpp (handle_events): Added - casts for (SOCKET) when using FD_SET to work around problems - with Borland. Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for reporting this. - * ace/Message_Block.h (ACE_Dynamic_Message_Strategy): Fixed a bunch - of non-const accessor methods to be const. Thanks to Johnny - Willemsen <johnny.willemsen@meco.nl> for reporting this. ->>>>>>> 4.1393 - -<<<<<<< ChangeLog * ace/Timer_Wheel_T.cpp (expire): Fixed a mistake where next_earliest_time should have been next_earliest. Thanks to Keith Brown <kalbrown@ix.netcom.com> for reporting this. @@ -266,16 +260,6 @@ Thu Dec 21 05:50:51 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> broadcast() don't release the mutex. Thanks to Mike Curtis <mccurry@my-deja.com> for pointing this out. -======= - * examples/Naming/Makefile (BIN2): Added the test_open.cpp file - to the Makefile. Thanks to Johnny Willemsen - <johnny.willemsen@meco.nl> for reporting this. - - * docs/tutorials/016/page02.html: Clarify that signal() or - broadcast() don't release the mutex. Thanks to Mike Curtis - <mccurry@my-deja.com> for pointing this out. - ->>>>>>> 4.1393 Wed Dec 20 19:44:16 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> * examples/Reactor/WFMO_Reactor/test_abandoned.cpp, @@ -1105,7 +1105,8 @@ Kent Stewart <kbstew99@hotmail.com> Alexander Kogan <jk@kogan.nnov.ru> Michael Lindner <mikel@att.net> Arnaud Compan <compan@ipanematech.com> -Frank A. Hunleth" <fhunleth@cs.wustl.edu> +Frank A. Hunleth <fhunleth@cs.wustl.edu> +Michael Searles <msearles@base16.com> Bogdan Jeram <bjeram@eso.org> I would particularly like to thank Paul Stephenson, who worked with me diff --git a/ace/ACE.cpp b/ace/ACE.cpp index a8c83390d63..c1a235385e5 100644 --- a/ace/ACE.cpp +++ b/ace/ACE.cpp @@ -2394,13 +2394,12 @@ ACE::format_hexdump (const char *buffer, // Returns the current timestamp in the form // "hour:minute:second:microsecond." The month, day, and year are -// also stored in the beginning of the date_and_time array. Returns 0 -// if unsuccessful, else returns pointer to beginning of the "time" -// portion of <day_and_time>. +// also stored in the beginning of the date_and_time array. ACE_TCHAR * ACE::timestamp (ACE_TCHAR date_and_time[], - int date_and_timelen) + int date_and_timelen, + int return_pointer_to_first_digit) { //ACE_TRACE ("ACE::timestamp"); @@ -2411,22 +2410,43 @@ ACE::timestamp (ACE_TCHAR date_and_time[], } #if defined (WIN32) - // @@ Jesper, I think Win32 supports all the UNIX versions below. - // Therefore, we can probably remove this WIN32 ifdef altogether. - SYSTEMTIME local; - ::GetLocalTime (&local); - - ACE_OS::sprintf (date_and_time, - ACE_LIB_TEXT ("%02d/%02d/%04d %02d.%02d.%02d.%06d"), - (int) local.wMonth, // new, also the %02d in sprintf - (int) local.wDay, // new, also the %02d in sprintf - (int) local.wYear, // new, also the %02d in sprintf - (int) local.wHour, - (int) local.wMinute, - (int) local.wSecond, - (int) local.wMilliseconds * 1000); - date_and_time[26] = '\0'; - return &date_and_time[11]; + // Emulate Unix. Win32 does NOT support all the UNIX versions + // below, so DO we need this ifdef. + static const char *day_of_week_name[] = { + ACE_LIB_TEXT ("Sun"), + ACE_LIB_TEXT ("Mon"), + ACE_LIB_TEXT ("Tue"), + ACE_LIB_TEXT ("Wed"), + ACE_LIB_TEXT ("Thr"), + ACE_LIB_TEXT ("Fri"), + ACE_LIB_TEXT ("Sat") + }; + + static const char *month_name[] = { + ACE_LIB_TEXT ("Jan"), + ACE_LIB_TEXT ("Feb"), + ACE_LIB_TEXT ("Mar"), + ACE_LIB_TEXT ("Apr"), + ACE_LIB_TEXT ("May"), + ACE_LIB_TEXT ("Jun"), + ACE_LIB_TEXT ("Jul"), + ACE_LIB_TEXT ("Aug"), + ACE_LIB_TEXT ("Sep"), + ACE_LIB_TEXT ("Oct"), + ACE_LIB_TEXT ("Nov"), + ACE_LIB_TEXT ("Dec") }; + + ACE_OS::sprintf (date_and_time, + ACE_LIB_TEXT ("%3s %3s %2d %04d %02d:%02d:%02d.%06d"), + day_of_week_name[local.wDayOfWeek], + month_name[local.wMonth - 1], + (int) local.wDay, + (int) local.wYear, + (int) local.wHour, + (int) local.wMinute, + (int) local.wSecond, + (int) (local.wMilliseconds * 1000)); + return &date_and_time[15 + (return_pointer_to_first_digit != 0)]; #else /* UNIX */ ACE_TCHAR timebuf[26]; // This magic number is based on the ctime(3c) man page. ACE_Time_Value cur_time = ACE_OS::gettimeofday (); @@ -2454,7 +2474,7 @@ ACE::timestamp (ACE_TCHAR date_and_time[], timetmp, cur_time.usec ()); date_and_time[33] = '\0'; - return &date_and_time[15]; + return &date_and_time[15 + (return_pointer_to_first_digit != 0)]; #endif /* WIN32 */ } diff --git a/ace/ACE.h b/ace/ACE.h index 4e52189602b..a5f3601d579 100644 --- a/ace/ACE.h +++ b/ace/ACE.h @@ -35,7 +35,7 @@ class ACE_Message_Block; * @class ACE * * @brief Contains value added ACE methods that extend the behavior - * of the UNIX and Win32 OS calls. + * of the UNIX and Win32 OS calls. * * This class consolidates all these ACE static methods in a * single place in order to manage the namespace better. These @@ -408,10 +408,14 @@ public: * "hour:minute:second:microsecond." The month, day, and year are * also stored in the beginning of the date_and_time array. Returns * 0 if unsuccessful, else returns pointer to beginning of the - * "time" portion of <day_and_time>. + * "time" portion of <day_and_time>. If + * <return_pointer_to_first_digit> is 0 then return a pointer to the + * space before the time, else return a pointer to the beginning of + * the time portion. */ static ACE_TCHAR *timestamp (ACE_TCHAR date_and_time[], - int time_len); + int time_len, + int return_pointer_to_first_digit = 0); /** * if <avoid_zombies> == 0 call <ACE_OS::fork> directly, else create diff --git a/ace/Asynch_Acceptor.cpp b/ace/Asynch_Acceptor.cpp index 1da5c30086d..c32b8b330ed 100644 --- a/ace/Asynch_Acceptor.cpp +++ b/ace/Asynch_Acceptor.cpp @@ -1,3 +1,4 @@ +/* -*- C++ -*- */ // $Id$ #ifndef ACE_ASYNCH_ACCEPTOR_C @@ -183,8 +184,8 @@ ACE_Asynch_Acceptor<HANDLER>::handle_accept (const ACE_Asynch_Accept::Result &re int error = 0; // If the asynchronous accept fails. - if (!error && - !result.success ()) + if (!result.success () || + result.accept_handle() == ACE_INVALID_HANDLE ) { error = 1; ACE_ERROR ((LM_ERROR, @@ -268,7 +269,8 @@ ACE_Asynch_Acceptor<HANDLER>::handle_accept (const ACE_Asynch_Accept::Result &re } // On failure, no choice but to close the socket - if (error) + if (error && + result.accept_handle() != ACE_INVALID_HANDLE ) ACE_OS::closesocket (result.accept_handle ()); // Delete the dynamically allocated message_block @@ -300,7 +302,10 @@ ACE_Asynch_Acceptor<HANDLER>::cancel (void) || (defined (__BORLANDC__) && (__BORLANDC__ >= 0x530))) return (int) ::CancelIo (this->listen_handle_); #else - ACE_NOTSUP_RETURN (-1); + //ACE_NOTSUP_RETURN (-1); + // Supported now + return this->asynch_accept_.cancel(); + #endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && ((defined (_MSC_VER) && (_MSC_VER > 1020)) || (defined (__BORLANDC__) && (__BORLANDC__ >= 0x530))) */ } diff --git a/ace/Asynch_Acceptor.h b/ace/Asynch_Acceptor.h index 835ed63ec5b..cd8b286b6c3 100644 --- a/ace/Asynch_Acceptor.h +++ b/ace/Asynch_Acceptor.h @@ -1,3 +1,4 @@ +/* -*- C++ -*- */ //============================================================================= /** @@ -9,7 +10,6 @@ */ //============================================================================= - #ifndef ACE_ASYNCH_ACCEPTOR_H #define ACE_ASYNCH_ACCEPTOR_H #include "ace/pre.h" @@ -94,8 +94,12 @@ public: /** * This cancels all pending accepts operations that were issued by - * the calling thread. The function does not cancel accept - * operations issued by other threads. + * the calling thread. + * Windows NT- The function does not cancel accept operations + * issued by other threads + * POSIX - all OK, it delegates cancelation to the + * ACE_POSIX_Asynch_Accept + * */ virtual int cancel (void); diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp index 3c279596cb4..366fbf31a10 100644 --- a/ace/POSIX_Asynch_IO.cpp +++ b/ace/POSIX_Asynch_IO.cpp @@ -1,3 +1,4 @@ +/* -*- C++ -*- */ // $Id$ #include "ace/POSIX_Asynch_IO.h" @@ -20,6 +21,12 @@ ACE_POSIX_Asynch_Result::bytes_transferred (void) const return this->bytes_transferred_; } +void +ACE_POSIX_Asynch_Result::set_bytes_transferred (u_long nbytes) +{ + this->bytes_transferred_= nbytes; +} + const void * ACE_POSIX_Asynch_Result::act (void) const { @@ -44,6 +51,11 @@ ACE_POSIX_Asynch_Result::error (void) const return this->error_; } +void +ACE_POSIX_Asynch_Result::set_error (u_long errcode) +{ + this->error_=errcode; +} ACE_HANDLE ACE_POSIX_Asynch_Result::event (void) const { @@ -114,7 +126,7 @@ ACE_POSIX_Asynch_Result::ACE_POSIX_Asynch_Result (ACE_Handler &handler, aio_sigevent.sigev_signo = signal_number; // Event is not used on POSIX. -ACE_UNUSED_ARG (event); + ACE_UNUSED_ARG (event); // // @@ Support offset_high with aiocb64. @@ -178,12 +190,27 @@ ACE_POSIX_Asynch_Operation::cancel (void) if (!p_impl) return -1; - // For <ACE_SUN_Proactor> this function is not implemented yet. We - // should call aiocancel instead of aio_cancel but we have not got - // information about aio_result_t. + // For ACE_SUN_Proactor and ACE_POSIX_AIOCB_Proactor + // we should call cancel_aio (this->handle_) + // method to cancel correctly all deferred AIOs - if (p_impl->get_impl_type () == ACE_POSIX_Proactor::PROACTOR_SUN) - return -1; + switch ( p_impl->get_impl_type ()) + { + case ACE_POSIX_Proactor::PROACTOR_SUN: + case ACE_POSIX_Proactor::PROACTOR_AIOCB: + { + ACE_POSIX_AIOCB_Proactor * p_impl_aiocb = ACE_dynamic_cast + (ACE_POSIX_AIOCB_Proactor *, + p_impl ); + + if ( ! p_impl_aiocb) + return -1; + + return p_impl_aiocb->cancel_aio (this->handle_); + } + default: + break; + } int result = ::aio_cancel (this->handle_, 0); @@ -1680,18 +1707,16 @@ ACE_POSIX_Asynch_Accept_Result::post_completion (ACE_Proactor_Impl *proactor) class ACE_Export ACE_POSIX_Asynch_Accept_Handler : public ACE_Event_Handler { // = TITLE - // For the POSIX implementation, we have two helper classes - // (ACE_POSIX_AIOCB_Asynch_Accept_Hander and - // ACE_POSIX_SIG_Asynch_Accept_Handler) to do <Asynch_Accept>. This - // class abstracts out the commonalities on these two helper classes. + // For the POSIX implementation this class is common + // for all Proactors (AIOCB/SIG/SUN) // // = DESCRIPTION // + public: ~ACE_POSIX_Asynch_Accept_Handler (void); // Destructor. -protected: ACE_POSIX_Asynch_Accept_Handler (ACE_Reactor* reactor, ACE_POSIX_Proactor *posix_proactor); // Constructor. Give the reactor so that it can activate/deactivate @@ -1699,22 +1724,29 @@ protected: // handler can send the <POSIX_Asynch_Accept> result block through // <post_completion>. - int register_accept_call_i (ACE_POSIX_Asynch_Accept_Result* result); + int cancel_uncompleted (int flg_notify); + // flg_notify points whether or not we should send notification about + // canceled accepts + + + int register_accept_call (ACE_POSIX_Asynch_Accept_Result* result); // Worker method for registering this <accept> call with the local - // handler. This method has the common code found between the two - // differnt implementation subclasses. This method assumes that - // locks are already obtained to access the shared the queues. + // handler. This method obtains lock to access the shared the queues. ACE_POSIX_Asynch_Accept_Result* deregister_accept_call (void); // Method for deregistering. + int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); + // Called when accept event comes up on <listen_hanlde> + +protected: ACE_Reactor* reactor_; // Reactor used by the Asynch_Accept. We need this here to enable // and disable the <handle> now and then, depending on whether any // <accept> is pending or no. ACE_POSIX_Proactor *posix_proactor_; - // POSIX_Proactor. + // POSIX_Proactor implementation. ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result*> result_queue_; // Queue of Result pointers that correspond to all the <accept>'s @@ -1729,75 +1761,27 @@ protected: // ********************************************************************* -class ACE_Export ACE_POSIX_AIOCB_Asynch_Accept_Handler : public ACE_POSIX_Asynch_Accept_Handler -{ - // = TITLE - // For the POSIX implementation, this class takes care of doing - // <Asynch_Accept> for AIOCB strategy. - // - // = DESCRIPTION - // -public: - ACE_POSIX_AIOCB_Asynch_Accept_Handler (ACE_Reactor *reactor, - ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor); - // Constructor. Give the reactor so that it can activate/deactivate - // the handlers. Give also the proactor used here, so that the - // handler can send information through the notification pipe - // (<post_completion>). - - ~ACE_POSIX_AIOCB_Asynch_Accept_Handler (void); - // Destructor. - - int register_accept_call (ACE_POSIX_Asynch_Accept_Result* result); - // Register this <accept> call with the local handler. - - virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); - // Called when accept event comes up on the <listen_handle>. -}; - -// ********************************************************************* - -class ACE_Export ACE_POSIX_SIG_Asynch_Accept_Handler : public ACE_POSIX_Asynch_Accept_Handler -{ - // = TITLE - // For the POSIX implementation, this class takes care of doing - // Asynch_Accept. - // - // = DESCRIPTION - // -public: - ACE_POSIX_SIG_Asynch_Accept_Handler (ACE_Reactor* reactor, - ACE_POSIX_SIG_Proactor *posix_sig_proactor); - // Constructor. Give the reactor so that it can activate/deactivate - // the handlers. Give also the proactor used here, so that the - // handler can send information through <post_completion>. - - ~ACE_POSIX_SIG_Asynch_Accept_Handler (void); - // Destructor. - - int register_accept_call (ACE_POSIX_Asynch_Accept_Result *result); - // Register this <accept> call with the local handler. - - virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); - // Called when accept event comes up on the <listen_handle>. -}; - -// ********************************************************************* - ACE_POSIX_Asynch_Accept_Handler::ACE_POSIX_Asynch_Accept_Handler (ACE_Reactor* reactor, ACE_POSIX_Proactor *posix_proactor) : reactor_ (reactor), posix_proactor_ (posix_proactor) { + ACE_TRACE ("ACE_POSIX_Asynch_Accept_Handler::ctor"); } ACE_POSIX_Asynch_Accept_Handler::~ACE_POSIX_Asynch_Accept_Handler (void) { + ACE_TRACE ("ACE_POSIX_Asynch_Accept_Handler::dtor"); } int -ACE_POSIX_Asynch_Accept_Handler::register_accept_call_i (ACE_POSIX_Asynch_Accept_Result* result) +ACE_POSIX_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Accept_Result* result) { + // The queue is updated by main thread in the register function call + // and thru the auxillary thread in the deregister fun. So let us + // mutex it. + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + // Insert this result to the queue. int insert_result = this->result_queue_.enqueue_tail (result); if (insert_result == -1) @@ -1830,7 +1814,7 @@ ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void) // The queue is updated by main thread in the register function call and // thru the auxillary thread in the deregister fun. So let us mutex // it. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0); + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); // Get the first item (result ptr) from the Queue. ACE_POSIX_Asynch_Accept_Result* result = 0; @@ -1858,102 +1842,60 @@ ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void) return result; } -// ********************************************************************* - -ACE_POSIX_AIOCB_Asynch_Accept_Handler::ACE_POSIX_AIOCB_Asynch_Accept_Handler (ACE_Reactor* reactor, - ACE_POSIX_AIOCB_Proactor* posix_aiocb_proactor) - : ACE_POSIX_Asynch_Accept_Handler (reactor, posix_aiocb_proactor) -{ -} - -ACE_POSIX_AIOCB_Asynch_Accept_Handler::~ACE_POSIX_AIOCB_Asynch_Accept_Handler (void) -{ -} +//@@ New method cancel_uncompleted +// It performs cancellation of all pending requests +// +// Parameter flg_notify can be +// 0 - don't send notifications about canceled accepts +// !0 - notify user about canceled accepts +// according POSIX standards we should receive notifications +// on canceled AIO requests +// +// Return value : number of cancelled requests +// + int -ACE_POSIX_AIOCB_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Accept_Result* result) +ACE_POSIX_Asynch_Accept_Handler::cancel_uncompleted (int flg_notify) { - // The queue is updated by main thread in the register function call - // and thru the auxillary thread in the deregister fun. So let us - // mutex it. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - return register_accept_call_i (result); -} - -int -ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) -{ - // An <accept> has been sensed on the <listen_handle>. We should be - // able to just go ahead and do the <accept> now on this <fd>. This - // should be the same as the <listen_handle>. - - // Deregister this info pertaining to this <accept> call. - ACE_POSIX_Asynch_Accept_Result* result = this->deregister_accept_call (); - if (result == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t):%p\n", - "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" - "handle_input:deregister_accept_call failed"), - -1); - - // Issue <accept> now. - // @@ We shouldnt block here since we have already done poll/select - // thru reactor. But are we sure? - ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), 0, 0); - if (new_handle == ACE_INVALID_HANDLE) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t):%p\n", - "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" - "handle_input:<accept> system call failed"), - -1); + int retval = 0; - // Accept has completed. - - // Store the new handle. - result->aio_fildes = new_handle; - - // Notify the main process about this completion - // Send the Result through the notification pipe. - if (this->posix_proactor_->post_completion (result) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p\n", - "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" - "handle_input:<post_completion> failed"), - -1); + for ( ; ; retval++ ) + { + ACE_POSIX_Asynch_Accept_Result* result = 0; - return 0; -} + this->result_queue_.dequeue_head (result); -// ********************************************************************* + if (result == 0) + break; -ACE_POSIX_SIG_Asynch_Accept_Handler::ACE_POSIX_SIG_Asynch_Accept_Handler (ACE_Reactor* reactor, - ACE_POSIX_SIG_Proactor *posix_sig_proactor) - : ACE_POSIX_Asynch_Accept_Handler (reactor, posix_sig_proactor) -{ -} + this->reactor_->suspend_handler (result->listen_handle ()); + + if ( ! flg_notify ) //if we should not notify + delete result ; // we have to delete result + else //else notify as any cancelled AIO + { + // Store the new handle. + result->aio_fildes = ACE_INVALID_HANDLE ; + result->set_bytes_transferred (0); + result->set_error (ECANCELED); + + if (this->posix_proactor_->post_completion (result) == -1) + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "ACE_POSIX_Asynch_Accept_Handler::" + "cancel_uncompleted:<post_completion> failed")); + } + } -ACE_POSIX_SIG_Asynch_Accept_Handler::~ACE_POSIX_SIG_Asynch_Accept_Handler (void) -{ + return retval; } -int -ACE_POSIX_SIG_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Accept_Result* result) -{ - // The queue is updated by main thread in the register function call - // and thru the auxillary thread in the deregister fun. So let us - // mutex it. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); - - // Do the work. - if (this->register_accept_call_i (result) == -1) - return -1; - - return 0; -} int -ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) +ACE_POSIX_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) { // An <accept> has been sensed on the <listen_handle>. We should be // able to just go ahead and do the <accept> now on this <fd>. This @@ -1964,7 +1906,7 @@ ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) if (result == 0) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t):%p\n", - "ACE_POSIX_SIG_Asynch_Accept_Handler::" + "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" "handle_input:deregister_accept_call failed"), -1); @@ -1973,40 +1915,46 @@ ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) // thru reactor. But are we sure? ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), 0, 0); if (new_handle == ACE_INVALID_HANDLE) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p\n", - "ACE_POSIX_SIG_Asynch_Accept_Handler::" - "handle_input:<accept> system call failed"), - -1); + { + result->set_error( errno ); + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t):%p\n", + "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" + "handle_input:<accept> system call failed")); - // Accept has completed. + // Notify client as usual, "AIO" finished with errors + } // Store the new handle. result->aio_fildes = new_handle; - // Notify the main process about this completion. + // Notify the main process about this completion + // Send the Result through the notification pipe. if (this->posix_proactor_->post_completion (result) == -1) ACE_ERROR_RETURN ((LM_ERROR, "Error:(%P | %t):%p\n", - "ACE_POSIX_SIG_Asynch_Accept_Handler::" + "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" "handle_input:<post_completion> failed"), -1); return 0; } + // ********************************************************************* -ACE_POSIX_AIOCB_Asynch_Accept::ACE_POSIX_AIOCB_Asynch_Accept (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor *posix_proactor) : ACE_Asynch_Operation_Impl (), ACE_Asynch_Accept_Impl (), - ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor), - accept_handler_ (0) + ACE_POSIX_Asynch_Operation (), + accept_handler_ (0), + grp_id_(-1), //thread not spawn + posix_proactor_ (posix_proactor) //save concrete proactor impl. { } int -ACE_POSIX_AIOCB_Asynch_Accept::accept (ACE_Message_Block &message_block, +ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, u_long bytes_to_read, ACE_HANDLE accept_handle, const void *act, @@ -2032,7 +1980,7 @@ ACE_POSIX_AIOCB_Asynch_Accept::accept (ACE_Message_Block &message_block, message_block, bytes_to_read, act, - this->posix_proactor ()->get_handle (), + this->posix_proactor_->get_handle (), priority, signal_number), -1); @@ -2045,11 +1993,21 @@ ACE_POSIX_AIOCB_Asynch_Accept::accept (ACE_Message_Block &message_block, } int -ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler, +ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, ACE_Proactor *proactor) { + // check for non zero accept_handler_ + // we could not create a new handler without closing the previous + + if ( this->accept_handler_ != 0 ) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:ACE_POSIX_Asynch_Accept::open:" + "accept_handler_ not null\n"), + -1); + + int result = ACE_POSIX_Asynch_Operation::open (handler, handle, completion_key, @@ -2060,8 +2018,8 @@ ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler, // Init the Asynch_Accept_Handler now. It needs to keep Proactor // also with it. ACE_NEW_RETURN (this->accept_handler_, - ACE_POSIX_AIOCB_Asynch_Accept_Handler (&this->reactor_, - this->posix_proactor ()), + ACE_POSIX_Asynch_Accept_Handler (&this->reactor_, + this->posix_proactor_), -1); // Register the handle with the reactor. @@ -2083,9 +2041,11 @@ ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler, // Spawn the thread. It is the only thread we are going to have. It // will do the <handle_events> on the reactor. - return_val = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_AIOCB_Asynch_Accept::thread_function, - ACE_reinterpret_cast (void *, &this->reactor_)); - if (return_val == -1) + // save group id of the created thread + + grp_id_ = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_Asynch_Accept::thread_function, + ACE_reinterpret_cast (void *, &this->reactor_)); + if (grp_id_ == -1) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:Thread_Manager::spawn failed\n"), -1); @@ -2093,12 +2053,73 @@ ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler, return 0; } -ACE_POSIX_AIOCB_Asynch_Accept::~ACE_POSIX_AIOCB_Asynch_Accept (void) +ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void) { + this->close (0); // not send notifications to user +} + +int +ACE_POSIX_Asynch_Accept::close ( int flg_notify) +{ + // 1. It performs cancellation of all pending requests + // 2. Stops and waits for the thread we had created + // 3. Removes accept_handler_ from Reactor + // 4. Deletes accept_handler_ + // 5. close the socket + // + // Parameter flg_notify can be + // 0 - don't send notifications about canceled accepts + // !0 - notify user about canceled accepts + // according POSIX standards we should receive notifications + // on canceled AIO requests + // + // Return codes : 0 - OK , + // -1 - Errors + + if ( this->accept_handler_ ) + this->accept_handler_->cancel_uncompleted ( flg_notify ); + + //stop and wait for the thread + + if ( grp_id_ != -1 ) + { + reactor_.end_reactor_event_loop (); + + if ( ACE_Thread_Manager::instance ()->wait_grp (grp_id_) ==-1) + ACE_ERROR ((LM_ERROR, + "%N:%l:Thread_Manager::wait_grp failed\n")); + else + grp_id_ = -1; + } + + //AL remove and destroy accept_handler_ + + if ( this->accept_handler_ != 0 ) + { + this->reactor_.remove_handler + ( this->accept_handler_, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL ); + + delete this->accept_handler_ ; + this->accept_handler_ = 0 ; + } + + // It looks like a good place to close listen handle here. + // But I am not sure with compatibility with the old programs + // You can comment the closure of the socket + + if ( this->handle_ != ACE_INVALID_HANDLE ) + { + ACE_OS::closesocket (this->handle_); + this->handle_=ACE_INVALID_HANDLE; + } + + return 0; } void* -ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor) +ACE_POSIX_Asynch_Accept::thread_function (void* arg_reactor) { // Retrieve the reactor pointer from the argument. ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *, @@ -2116,7 +2137,9 @@ ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor) // Handle events. int result = 0; - while (result != -1) + + //while (result != -1) + while ( reactor->reactor_event_loop_done () == 0 ) { result = reactor->handle_events (); } @@ -2128,151 +2151,36 @@ ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor) // the call to the ACE_POSIX_Asynch_Operation base class. int -ACE_POSIX_AIOCB_Asynch_Accept::cancel (void) -{ - return ACE_POSIX_Asynch_Operation::cancel (); -} - -ACE_Proactor * -ACE_POSIX_AIOCB_Asynch_Accept::proactor (void) const +ACE_POSIX_Asynch_Accept::cancel (void) { - return ACE_POSIX_Asynch_Operation::proactor (); -} + //We are not ACE_POSIX_Asynch_Operation + //so we could not call ::aiocancel () + //We delegate real cancelation to the accept_handler_ + // accept_handler_->cancel_uncompleted (1) -// ********************************************************************* + //return ACE_POSIX_Asynch_Operation::cancel (); -ACE_POSIX_SIG_Asynch_Accept::ACE_POSIX_SIG_Asynch_Accept (ACE_POSIX_SIG_Proactor *posix_sig_proactor) - : ACE_Asynch_Operation_Impl (), - ACE_Asynch_Accept_Impl (), - ACE_POSIX_SIG_Asynch_Operation (posix_sig_proactor), - accept_handler_ (0) -{ -} + if ( this->accept_handler_ == 0 ) + return 1 ; // AIO_ALLDONE -int -ACE_POSIX_SIG_Asynch_Accept::accept (ACE_Message_Block &message_block, - u_long bytes_to_read, - ACE_HANDLE accept_handle, - const void *act, - int priority, - int signal_number) -{ - // Sanity check: make sure that enough space has been allocated by - // the caller. - size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr); - size_t space_in_use = message_block.wr_ptr () - message_block.base (); - size_t total_size = message_block.size (); - size_t available_space = total_size - space_in_use; - size_t space_needed = bytes_to_read + 2 * address_size; - if (available_space < space_needed) - ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("Buffer too small\n")), -1); + //cancel with notifications as POSIX should do - // Common code for both WIN and POSIX. - ACE_POSIX_Asynch_Accept_Result *result = 0; - ACE_NEW_RETURN (result, - ACE_POSIX_Asynch_Accept_Result (*this->handler_, - this->handle_, - accept_handle, - message_block, - bytes_to_read, - act, - this->posix_sig_proactor_->get_handle (), - priority, - signal_number), - -1); + int retval = this->accept_handler_->cancel_uncompleted (1); - // Register this <accept> call with the local handler. - this->accept_handler_->register_accept_call (result); + //retval contains now the number of canceled requests + + if ( retval == 0 ) + return 1 ; // AIO_ALLDONE - return 0; -} + if ( retval > 0 ) + return 0; // AIO_CANCELED + return -1; -int -ACE_POSIX_SIG_Asynch_Accept::open (ACE_Handler &handler, - ACE_HANDLE handle, - const void *completion_key, - ACE_Proactor *proactor) -{ - int result = ACE_POSIX_Asynch_Operation::open (handler, - handle, - completion_key, - proactor); - if (result == -1) - return result; - - // Init the Asynch_Accept_Handler now. It needs to keep Proactor - // also with it. - ACE_NEW_RETURN (this->accept_handler_, - ACE_POSIX_SIG_Asynch_Accept_Handler (&this->reactor_, - this->posix_proactor ()), - -1); - - // Register the handle with the reactor. - int return_val = this->reactor_.register_handler (this->handle_, - this->accept_handler_, - ACE_Event_Handler::ACCEPT_MASK); - if (return_val == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:Reactor::register_handler failed\n"), - -1); - - // Suspend the <handle> now. Enable only when the <accept> is issued - // by the application. - return_val = this->reactor_.suspend_handler (this->handle_); - if (return_val == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:Reactor::suspend_handler failed\n"), - -1); - - // Spawn the thread. It is the only thread we are going to have. It - // will do the <handle_events> on the reactor. - return_val = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_SIG_Asynch_Accept::thread_function, - (void *)&this->reactor_); - if (return_val == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:Thread_Manager::spawn failed\n"), - -1); - - return 0; -} - -ACE_POSIX_SIG_Asynch_Accept::~ACE_POSIX_SIG_Asynch_Accept (void) -{ -} - -void* -ACE_POSIX_SIG_Asynch_Accept::thread_function (void* arg_reactor) -{ - // Retrieve the reactor pointer from the argument. - ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *, - arg_reactor); - if (reactor == 0) - reactor = ACE_Reactor::instance (); - - // For this reactor, this thread is the owner. - reactor->owner (ACE_OS::thr_self ()); - - // Handle events. Wait for any connection events. - int result = 0; - while (result != -1) - result = reactor->handle_events (); - - return 0; -} - -// Methods belong to ACE_POSIX_Asynch_Operation base class. These -// methods are defined here to avoid dominance warnings. They route -// the call to the ACE_POSIX_Asynch_Operation base class. - -int -ACE_POSIX_SIG_Asynch_Accept::cancel (void) -{ - return ACE_POSIX_Asynch_Operation::cancel (); } ACE_Proactor * -ACE_POSIX_SIG_Asynch_Accept::proactor (void) const +ACE_POSIX_Asynch_Accept::proactor (void) const { return ACE_POSIX_Asynch_Operation::proactor (); } diff --git a/ace/POSIX_Asynch_IO.h b/ace/POSIX_Asynch_IO.h index beac02277b0..9ede6b5c37a 100644 --- a/ace/POSIX_Asynch_IO.h +++ b/ace/POSIX_Asynch_IO.h @@ -1,4 +1,4 @@ -// -*- C++ -*- +/* -*- C++ -*- */ //============================================================================= /** @@ -98,6 +98,12 @@ public: /// Destructor. virtual ~ACE_POSIX_Asynch_Result (void); + /// Simulate error value to use in the post_completion () + void set_error (u_long errcode); + + /// Simulate value to use in the post_completion () + void set_bytes_transferred (u_long nbytes); + protected: /// Constructor. <Event> is not used on POSIX. ACE_POSIX_Asynch_Result (ACE_Handler &handler, @@ -607,7 +613,7 @@ public: int priority, int signal_number = 0); - /// Destrcutor. + /// Destructor. virtual ~ACE_POSIX_AIOCB_Asynch_Write_Stream (void); // = Methods belong to ACE_POSIX_Asynch_Operation base class. These @@ -1084,7 +1090,7 @@ public: /** * This starts off an asynchronous write. Upto <bytes_to_write> - * will be write and stored in the <message_block>. The write will + * will be written and stored in the <message_block>. The write will * start at <offset> from the beginning of the file. */ int write (ACE_Message_Block &message_block, @@ -1157,7 +1163,7 @@ public: /** * This starts off an asynchronous write. Upto <bytes_to_write> - * will be write and stored in the <message_block>. The write will + * will be written and stored in the <message_block>. The write will * start at <offset> from the beginning of the file. */ int write (ACE_Message_Block &message_block, @@ -1222,8 +1228,8 @@ class ACE_Export ACE_POSIX_Asynch_Accept_Result : public virtual ACE_Asynch_Acce public ACE_POSIX_Asynch_Result { /// Factory classes willl have special permissions. - friend class ACE_POSIX_AIOCB_Asynch_Accept; - friend class ACE_POSIX_SIG_Asynch_Accept; + friend class ACE_POSIX_Asynch_Accept; + friend class ACE_POSIX_Asynch_Accept_Handler; /// The Proactor constructs the Result class for faking results. friend class ACE_POSIX_Proactor; @@ -1330,23 +1336,24 @@ protected: }; /** - * @class ACE_POSIX_AIOCB_Asynch_Accept_Handler + * @class ACE_POSIX_Asynch_Accept_Handler * * Forward declaration. This class is defined the in the cpp file, * since this is internal to the implementation. */ -class ACE_POSIX_AIOCB_Asynch_Accept_Handler; +class ACE_POSIX_Asynch_Accept_Handler; /** - * @class ACE_POSIX_AIOCB_Asynch_Accept + * @class ACE_POSIX_Asynch_Accept * */ -class ACE_Export ACE_POSIX_AIOCB_Asynch_Accept : public virtual ACE_Asynch_Accept_Impl, - public ACE_POSIX_AIOCB_Asynch_Operation +class ACE_Export ACE_POSIX_Asynch_Accept : + public virtual ACE_Asynch_Accept_Impl, + public ACE_POSIX_Asynch_Operation { public: /// Constructor. - ACE_POSIX_AIOCB_Asynch_Accept (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor); + ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor * posix_proactor); /** * This <open> belongs to ACE_AIOCB_Asynch_Operation. We forward @@ -1377,15 +1384,28 @@ public: int signal_number = 0); /// Destructor. - virtual ~ACE_POSIX_AIOCB_Asynch_Accept (void); + virtual ~ACE_POSIX_Asynch_Accept (void); // = Methods belong to ACE_POSIX_Asynch_Operation base class. These // methods are defined here to avoid dominace warnings. They route // the call to the ACE_POSIX_Asynch_Operation base class. - /// - /// @@ Not implemented. Returns 0. + /** + * Cancel all pending pseudo-asynchronus requests + * Behavior as usual AIO request + */ int cancel (void); + + /** + * Close performs cancellation of all pending requests + * Parameter flg_notify can be + * 0 - don't send notifications about canceled accepts + * 1 - notify user about canceled accepts + * according POSIX standards we should receive notifications + * on canceled AIO requests + */ + int close ( int flg_notify); + /// Return the underlying proactor. ACE_Proactor* proactor (void) const; @@ -1398,83 +1418,16 @@ private: ACE_Reactor reactor_; /// The Event Handler to do handle_input. - ACE_POSIX_AIOCB_Asynch_Accept_Handler* accept_handler_; -}; - -/** - * @class ACE_POSIX_SIG_Asynch_Accept_Handler; - * - * Forward declaration. This class is defined the in the cpp file, - * since this is internal to the implementation. - */ -class ACE_POSIX_SIG_Asynch_Accept_Handler; - -/** - * @class ACE_POSIX_SIG_Asynch_Accept - * - * @brief This class implements <ACE_Asynch_Accept> for Realtime - * Signal (<sigtimedwait>) based implementation of Proactor. - */ -class ACE_Export ACE_POSIX_SIG_Asynch_Accept : public virtual ACE_Asynch_Accept_Impl, - public ACE_POSIX_SIG_Asynch_Operation -{ -public: - /// Constructor. - ACE_POSIX_SIG_Asynch_Accept (ACE_POSIX_SIG_Proactor *posix_sig_proactor); + ACE_POSIX_Asynch_Accept_Handler* accept_handler_; - /** - * This <open> belongs to ACE_SIG_Asynch_Operation. We forward this - * call to that method. We have put this here to avoid the compiler - * warnings. - */ - int open (ACE_Handler &handler, - ACE_HANDLE handle, - const void *completion_key, - ACE_Proactor *proactor = 0); + /// group id for the thread that we create for accepts + int grp_id_ ; - /** - * This starts off an asynchronous accept. The asynchronous accept - * call also allows any initial data to be returned to the - * <handler>. Upto <bytes_to_read> will be read and stored in the - * <message_block>. The <accept_handle> will be used for the - * <accept> call. If (<accept_handle> == INVALID_HANDLE), a new - * handle will be created. - * - * <message_block> must be specified. This is because the address of - * the new connection is placed at the end of this buffer. - */ - int accept (ACE_Message_Block &message_block, - u_long bytes_to_read, - ACE_HANDLE accept_handle, - const void *act, - int priority, - int signal_number); - - /// Destructor. - virtual ~ACE_POSIX_SIG_Asynch_Accept (void); - - // = Methods belong to ACE_POSIX_Asynch_Operation base class. These - // methods are defined here to avoid dominace warnings. They route - // the call to the ACE_POSIX_Asynch_Operation base class. - - /// - /// @@ Not implemented. Returns 0. - int cancel (void); - - /// Return the underlying proactor. - ACE_Proactor* proactor (void) const; - -private: - /// The thread function that does handle events. - static void* thread_function (void* reactor); - - /// Reactor to wait on the <listen_handle>. - ACE_Reactor reactor_; - - /// The Event Handler to do handle_input. - ACE_POSIX_SIG_Asynch_Accept_Handler* accept_handler_; + /// POSIX Proactor implementation + ACE_POSIX_Proactor * posix_proactor_; }; + /** * @class ACE_POSIX_Asynch_Transmit_File_Result * diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 8634454e154..7a5aa21b73e 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -1,3 +1,4 @@ +/* -*- C++ -*- */ // $Id$ #define ACE_BUILD_DLL @@ -421,6 +422,9 @@ ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Pr // Open the pipe. this->pipe_.open (); + // Let AIOCB_Proactor know about our handle + posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ()); + // Open the read stream. if (this->read_stream_.open (*this, this->pipe_.read_handle (), @@ -474,11 +478,12 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream: asynch_result = *(ACE_POSIX_Asynch_Result **) result.message_block ().rd_ptr (); // Do the upcall. - this->posix_aiocb_proactor_->application_specific_code (asynch_result, - 0, // No Bytes transferred. - 1, // Success. - 0, // Completion token. - 0); // Error. + this->posix_aiocb_proactor_->application_specific_code + (asynch_result, + asynch_result->bytes_transferred(), // 0, No bytes transferred. + 1, // Result : True. + 0, // No completion key. + asynch_result->error()); //0, No error. // Set the message block properly. Put the <wr_ptr> back in the // initial position. @@ -504,12 +509,13 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) aiocb_list_ (0), result_list_ (0), aiocb_list_max_size_ (max_aio_operations), - aiocb_list_cur_size_ (0) + aiocb_list_cur_size_ (0), + notify_pipe_read_handle_ (ACE_INVALID_HANDLE), + num_deferred_aiocb_ (0), + num_started_aio_(0) { - if (aiocb_list_max_size_ > 8192) - // @@ Alex, this shouldn't be a magic number -- it should be a - // constant, e.g., ACE_AIO_MAX_SIZE or something. - aiocb_list_max_size_ = 8192; + //check for correct value for max_aio_operations + check_max_aio_num () ; ACE_NEW (aiocb_list_, aiocb *[aiocb_list_max_size_]); @@ -532,14 +538,15 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,in aiocb_list_ (0), result_list_ (0), aiocb_list_max_size_ (max_aio_operations), - aiocb_list_cur_size_ (0) + aiocb_list_cur_size_ (0), + notify_pipe_read_handle_ (ACE_INVALID_HANDLE), + num_deferred_aiocb_ (0), + num_started_aio_(0) { ACE_UNUSED_ARG (Flg); - if (aiocb_list_max_size_ > 8192) - // @@ Alex, this shouldn't be a magic number -- it should be a - // constant, e.g., ACE_AIO_MAX_SIZE or something. - aiocb_list_max_size_ = 8192; + //check for correct value for max_aio_operations + check_max_aio_num () ; ACE_NEW (aiocb_list_, aiocb *[aiocb_list_max_size_]); @@ -562,13 +569,70 @@ ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) { delete_notify_manager (); + // delete all uncomlpeted operarion + // as nobody will notify client since now + for (size_t ai = 0; ai < aiocb_list_max_size_; ai++) + { + delete result_list_[ai] ; + result_list_[ai] = 0; + aiocb_list_[ai] = 0; + } + + delete [] aiocb_list_; aiocb_list_ = 0; - + delete [] result_list_; result_list_ = 0; } +void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h) +{ + notify_pipe_read_handle_ = h ; +} + +void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () +{ + long max_os_aio_num = ACE_OS ::sysconf ( _SC_AIO_MAX ); + + // Define max limit AIO's for concrete OS + // -1 means that there is no limit, but it is not true + // ( example, SunOS 5.6) + + if ( max_os_aio_num > 0 + && aiocb_list_max_size_ > ( unsigned long ) max_os_aio_num + ) + aiocb_list_max_size_ = max_os_aio_num ; + +#if defined(HPUX) + + // Although HPUX 11.00 allows to start 2048 AIO's + // for all process in system + // it has a limit 256 max elements for aio_suspend () + // It is a pity, but ... + + long max_os_listio_num = ACE_OS ::sysconf ( _SC_AIO_LISTIO_MAX ); + if ( max_os_listio_num > 0 + && aiocb_list_max_size_ > ( unsigned long ) max_os_listio_num + ) + aiocb_list_max_size_ = max_os_listio_num ; + +#endif + + // The last check for user-defined value + // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h + + if ( aiocb_list_max_size_ <= 0 + || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE + ) + aiocb_list_max_size_ = ACE_AIO_MAX_SIZE; + + ACE_DEBUG ((LM_DEBUG, + "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n", + aiocb_list_max_size_)); + +} + void ACE_POSIX_AIOCB_Proactor::create_notify_manager (void) { @@ -656,8 +720,10 @@ ACE_POSIX_AIOCB_Proactor::create_asynch_accept (void) { ACE_Asynch_Accept_Impl *implementation = 0; ACE_NEW_RETURN (implementation, - ACE_POSIX_AIOCB_Asynch_Accept (this), + ACE_POSIX_Asynch_Accept (this), 0); + //was ACE_POSIX_AIOCB_Asynch_Accept (this) + return implementation; } @@ -705,43 +771,56 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) 0); // let continue work } + size_t index = 0; int error_status = 0; int return_status = 0; - ACE_POSIX_Asynch_Result *asynch_result = - find_completed_aio (error_status, return_status); + int retval= 0; - if (asynch_result == 0) - return 0; + for ( ; ; ) + { + ACE_POSIX_Asynch_Result *asynch_result = + find_completed_aio (error_status, return_status,index); + + if (asynch_result == 0) + break; + + //at least one processed + retval = 1 ; // more informative retval++ - // Call the application code. - this->application_specific_code (asynch_result, + // Call the application code. + this->application_specific_code (asynch_result, return_status, // Bytes transferred. 1, // Success 0, // No completion key. error_status); // Error - return 1; + } + + return retval; } ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, - int &return_status) + int &return_status, + size_t &index ) { ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0)); - - size_t ai; + ACE_POSIX_Asynch_Result *asynch_result = 0; error_status = 0; return_status= 0; - - for (ai = 0; ai < aiocb_list_max_size_; ai++) + + if ( num_started_aio_ == 0 ) // save time + return asynch_result; + + for (; index < aiocb_list_max_size_; index++) { - if (aiocb_list_[ai] == 0) // Dont process null blocks. + if (aiocb_list_[index] == 0) // Dont process null blocks. continue; // Get the error status of the aio_ operation. - error_status = aio_error (aiocb_list_[ai]); + error_status = aio_error (aiocb_list_[index]); if (error_status == -1) // <aio_error> itself has failed. { @@ -750,12 +829,16 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, "ACE_POSIX_AIOCB_Proactor::find_completed_aio:" "<aio_error> has failed\n")); - // skip this operation - aiocb_list_[ai] = 0; - result_list_[ai] = 0; - aiocb_list_cur_size_--; - - continue; + break; + + // we should notify user, otherwise : + // memory leak for result and "hanging" user + // what was before skip this operation + + //aiocb_list_[index] = 0; + //result_list_[index] = 0; + //aiocb_list_cur_size_--; + //continue; } // Continue the loop if <aio_> operation is still in progress. @@ -764,29 +847,40 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, } // end for - if (ai >= this->aiocb_list_max_size_) // all processed + if (index >= this->aiocb_list_max_size_) // all processed return asynch_result; else if (error_status == ECANCELED) return_status = 0; - else - return_status = aio_return (aiocb_list_[ai]); - + else if (error_status == -1) + return_status = 0; + else + return_status = aio_return (aiocb_list_[index]); + if (return_status == -1) { - // was ACE_ERROR_RETURN - ACE_ERROR ((LM_ERROR, + return_status = 0; // zero bytes transferred + + if (error_status == 0) // nonsense + ACE_ERROR ((LM_ERROR, "%N:%l:(%P | %t)::%p\n", "ACE_POSIX_AIOCB_Proactor::find_completed_aio:" "<aio_return> failed\n")); - return_status = 0; // zero bytes transferred } - asynch_result = result_list_[ai]; + + asynch_result = result_list_[index]; - aiocb_list_[ai] = 0; - result_list_[ai] = 0; + aiocb_list_[index] = 0; + result_list_[index] = 0; aiocb_list_cur_size_--; + num_started_aio_--; // decrement count active aios + index++ ; // for next iteration + + this->start_deferred_aio (); + //make attempt to start deferred AIO + //It is safe as we are protected by mutex_ + return asynch_result; } @@ -811,42 +905,107 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::register_and_start_aio"); ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); - + int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0; if (result == 0) // Just check the status of the list return ret_val; - // Non-zero ptr. Find a free slot and store. - if (ret_val == 0) + // Save operation code in the aiocb + switch ( op ) + { + case 0 : + result->aio_lio_opcode = LIO_READ; + break; + + case 1 : + result->aio_lio_opcode = LIO_WRITE; + break; + + default: + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::\n" + "register_and_start_aio: Invalid operation code\n"), + -1); + } + + if (ret_val != 0) // No free slot + { + errno = EAGAIN; + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::\n" + "register_and_start_aio: " + "No space to store the <aio>info\n"), + -1); + } + + // Find a free slot and store. + // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager + // so make check for ACE_AIOCB_Notify_Pipe_Manager request + + size_t i = 0; + + if ( notify_pipe_read_handle_ == result->aio_fildes ) // Notify_Pipe ? + { // should be free, + if ( result_list_[i] != 0 ) // only 1 request + { // is allowed + errno = EAGAIN; + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::\n" + "register_and_start_aio:" + "internal Proactor error 0\n"), + -1 ); + } + } + else //try to find free slot as usual, but starting from 1 { - for (size_t i= 0; i < this->aiocb_list_max_size_; i++) - if (aiocb_list_[i] == 0) - { - ret_val = start_aio (result, op); - - if (ret_val == 0) // Store the pointers. - { - aiocb_list_[i] = result; - result_list_[i] = result; - - aiocb_list_cur_size_++; - } - return ret_val; - } - - errno = EAGAIN; - ret_val = -1; + for ( i= 1; i < this->aiocb_list_max_size_; i++) + if (result_list_[i] == 0) + break ; } - - ACE_ERROR ((LM_ERROR, + + if ( i >= this->aiocb_list_max_size_ ) + ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" - "register_and_start_aio: No space to store the <aio>info\n")); - return ret_val; + "register_and_start_aio: " + "internal Proactor error 1\n"), + -1); + + result_list_[i] = result; //Store result ptr anyway + aiocb_list_cur_size_++; + + ret_val = start_aio (result); + + switch ( ret_val ) + { + case 0 : // started OK + aiocb_list_[i] = result; + return 0 ; + + case 1 : //OS AIO queue overflow + num_deferred_aiocb_ ++ ; + return 0 ; + + default: //Invalid request, there is no point + break; // to start it later + } + + result_list_[i] = 0; + aiocb_list_cur_size_--; + + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "register_and_start_aio: Invalid request to start <aio>\n")); + return -1; } +// start_aio has new return codes +// 0 AIO was started successfully +// 1 AIO was not started, OS AIO queue overflow +// -1 AIO was not started, other errors + int -ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, int op) +ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result) { ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio"); @@ -855,13 +1014,13 @@ ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, int op) // Start IO - switch (op) + switch (result->aio_lio_opcode ) { - case 0: + case LIO_READ : ptype = "read "; ret_val = aio_read (result); break; - case 1: + case LIO_WRITE : ptype = "write"; ret_val = aio_write (result); break; @@ -870,16 +1029,177 @@ ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, int op) ret_val = -1; break; } - - if (ret_val == -1) - ACE_ERROR ((LM_ERROR, + + if (ret_val == 0 ) + num_started_aio_ ++ ; + else // if (ret_val == -1) + { + if ( errno == EAGAIN ) //Ok, it will be deferred AIO + ret_val = 1 ; + else + ACE_ERROR ((LM_ERROR, "%N:%l:(%P | %t)::start_aio: aio_%s %p\n", ptype, "queueing failed\n")); + } return ret_val; } + +int +ACE_POSIX_AIOCB_Proactor::start_deferred_aio () +{ + ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio"); + + // This protected method is called from + // find_completed_aio after any AIO completion + // We should call this method always with locked + // ACE_POSIX_AIOCB_Proactor::mutex_ + // + // It tries to start the first deferred AIO + // if such exists + + if ( num_deferred_aiocb_ == 0 ) + return 0 ; // nothing to do + + size_t i = 0; + + for ( i= 0; i < this->aiocb_list_max_size_; i++) + if ( result_list_[i] !=0 // check for + && aiocb_list_[i] ==0 ) // deferred AIO + break ; + + if ( i >= this->aiocb_list_max_size_ ) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::\n" + "start_deferred_aio:" + "internal Proactor error 3\n"), + -1); + + ACE_POSIX_Asynch_Result *result = result_list_[i]; + + int ret_val = start_aio (result); + + switch ( ret_val ) + { + case 0 : //started OK , decrement count of deferred AIOs + aiocb_list_[i] = result; + num_deferred_aiocb_ -- ; + return 0 ; + + case 1 : + return 0 ; //try again later + + default : // Invalid Parameters , should never be + break ; + } + + //AL notify user + + result_list_[i] = 0; + aiocb_list_cur_size_--; + + num_deferred_aiocb_ -- ; + + result->set_error (errno); + result->set_bytes_transferred (0); + this->post_completion ( result ); + + return -1; +} + +int +ACE_POSIX_AIOCB_Proactor::cancel_aio ( ACE_HANDLE handle ) +{ + // This new method should be called from + // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel + // It scans the result_list_ and defines all AIO requests + // that were issued for handle "handle" + // + // For all deferred AIO requests with handle "handle" + // it removes its from the lists and notifies user + // + // For all running AIO requests with handle "handle" + // it calls ::aio_cancel. According to the POSIX standards + // we will receive ECANCELED for all ::aio_canceled AIO requests + // later on return from ::aio_suspend + + ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio"); + + ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); + + int num_total = 0; + int num_cancelled = 0; + size_t ai = 0; + + for (ai = 0; ai < aiocb_list_max_size_; ai++) + { + if ( result_list_[ai] == 0 ) //skip empty slot + continue ; + + if ( result_list_[ai]->aio_fildes != handle ) //skip not our slot + continue ; + + num_total++ ; + + ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai]; + + if ( aiocb_list_ [ai] == 0 ) //deferred aio + { + num_cancelled ++ ; + num_deferred_aiocb_ -- ; + + aiocb_list_[ai] = 0; + result_list_[ai] = 0; + aiocb_list_cur_size_--; + + asynch_result->set_error (ECANCELED); + asynch_result->set_bytes_transferred (0); + this->post_completion ( asynch_result ); + } + else //cancel started aio + { + int rc_cancel = this->cancel_aiocb (asynch_result ); + + if ( rc_cancel == 0 ) //notification in the future + num_cancelled ++ ; //it is OS responsiblity + } + } + + if ( num_total == 0 ) + return 1; // ALLDONE + + if ( num_cancelled == num_total ) + return 0; // CANCELLED + + return 2; // NOT CANCELLED +} + +int +ACE_POSIX_AIOCB_Proactor::cancel_aiocb ( ACE_POSIX_Asynch_Result * result ) +{ + // This new method is called from cancel_aio + // to cancel concrete running AIO request + int rc = ::aio_cancel (0, result ); + + // Check the return value and return 0/1/2 appropriately. + if (rc == AIO_CANCELED) + return 0; + else if (rc == AIO_ALLDONE) + return 1; + else if (rc == AIO_NOTCANCELED) + return 2; + + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "cancel_aiocb:" + "Unexpected result from <aio_cancel>"), + -1); + +} + + // ********************************************************************* ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void) @@ -1048,8 +1368,10 @@ ACE_POSIX_SIG_Proactor::create_asynch_accept (void) { ACE_Asynch_Accept_Impl *implementation = 0; ACE_NEW_RETURN (implementation, - ACE_POSIX_SIG_Asynch_Accept (this), + ACE_POSIX_Asynch_Accept (this), 0); + + // was ACE_POSIX_SIG_Asynch_Accept (this), return implementation; } @@ -1274,7 +1596,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) // Failure. if (return_status == -1) - { + { ACE_ERROR ((LM_ERROR, "%N:%l:(%P | %t)::%p\n", "ACE_POSIX_SIG_Proactor::handle_events:" @@ -1294,11 +1616,12 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) } else if (sig_info.si_code == SI_QUEUE) { - this->application_specific_code (asynch_result, - 0, // No bytes transferred. - 1, // Result : True. - 0, // No completion key. - 0); // No error. + this->application_specific_code + (asynch_result, + asynch_result->bytes_transferred(), // 0, No bytes transferred. + 1, // Result : True. + 0, // No completion key. + asynch_result->error()); //0, No error. } else // Unknown signal code. diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h index d8f61a25b0f..836c97e3459 100644 --- a/ace/POSIX_Proactor.h +++ b/ace/POSIX_Proactor.h @@ -26,6 +26,9 @@ #include "ace/Pipe.h" #include "ace/POSIX_Asynch_IO.h" +#define ACE_AIO_MAX_SIZE 2048 +#define ACE_AIO_DEFAULT_SIZE 1024 + /** * @class ACE_POSIX_Proactor * @@ -234,7 +237,7 @@ class ACE_Export ACE_POSIX_AIOCB_Proactor : public ACE_POSIX_Proactor public: /// Constructor defines max number asynchronous operations /// which can be started at the same time - ACE_POSIX_AIOCB_Proactor (size_t nmaxop = 256) ; //ACE_RTSIG_MAX + ACE_POSIX_AIOCB_Proactor (size_t nmaxop = ACE_AIO_DEFAULT_SIZE); virtual Proactor_Type get_impl_type (void); @@ -276,6 +279,19 @@ public: virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void); + /** + * This method should be called from + * ACE_POSIX_Asynch_Operation::cancel() + * instead of usual ::aio_cancel. + * For all deferred AIO requests with handle "h" + * it removes its from the lists and notifies user. + * For all running AIO requests with handle "h" + * it calls ::aio_cancel. According to the POSIX standards + * we will receive ECANCELED for all ::aio_canceled AIO requests + * later on return from ::aio_suspend + */ + virtual int cancel_aio (ACE_HANDLE h); + protected: /// Special constructor for ACE_SUN_Proactor @@ -286,6 +302,15 @@ protected: void create_notify_manager (void); void delete_notify_manager (void); + /// Define the maximum number of asynchronous I/O requests + /// for the current OS + void check_max_aio_num (void) ; + + /// To identify requests from Notify_Pipe_Manager + void set_notify_handle (ACE_HANDLE h); + + + /** * Dispatch a single set of events. If <milli_seconds> elapses * before any events occur, return 0. Return 1 if a completion @@ -303,12 +328,20 @@ protected: virtual int register_and_start_aio (ACE_POSIX_Asynch_Result *result, int op); - virtual int start_aio (ACE_POSIX_Asynch_Result *result, - int op); + + /// Op code now is saved in ACE_POSIX_Asynch_Result + virtual int start_aio (ACE_POSIX_Asynch_Result *result); + + /// Start deferred AIO if necessary + int start_deferred_aio(); + + /// Cancel running or deferred AIO + virtual int cancel_aiocb ( ACE_POSIX_Asynch_Result * result ); /// Extract the results of aio. ACE_POSIX_Asynch_Result *find_completed_aio (int &error_status, - int &return_status); + int &return_status, + size_t &index ); /// This class takes care of doing <accept> when we use /// AIO_CONTROL_BLOCKS strategy. @@ -329,6 +362,18 @@ protected: /// Mutex to protect work with lists. ACE_Thread_Mutex mutex_; #endif /* ACE_MT_SAFE */ + + /// The purpose of this member is only to identify asynchronous request + /// from NotifyManager. We will reserve for it always slot 0 + /// in the list of aiocb's to be sure that don't lose notifications. + ACE_HANDLE notify_pipe_read_handle_ ; + + /// number of ACE_POSIX_Asynch_Result's waiting for start + /// i.e. deferred AIOs + size_t num_deferred_aiocb_ ; + + /// Number active,i.e. running requests + size_t num_started_aio_ ; }; /** diff --git a/ace/SUN_Proactor.cpp b/ace/SUN_Proactor.cpp index 33c0f266fcd..f341bce521d 100644 --- a/ace/SUN_Proactor.cpp +++ b/ace/SUN_Proactor.cpp @@ -1,3 +1,4 @@ +/* -*- C++ -*- */ // $Id$ #include "ace/SUN_Proactor.h" @@ -13,7 +14,8 @@ #endif /* __ACE_INLINE__ */ ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations) - : ACE_POSIX_AIOCB_Proactor (max_aio_operations , 0) + : ACE_POSIX_AIOCB_Proactor (max_aio_operations , 0), + condition_ (mutex_) { // To provide correct virtual calls. create_notify_manager (); @@ -29,6 +31,8 @@ ACE_SUN_Proactor::~ACE_SUN_Proactor (void) int ACE_SUN_Proactor::handle_events (ACE_Time_Value &wait_time) { + // Decrement <wait_time> with the amount of time spent in the method + ACE_Countdown_Time countdown (&wait_time); return this->handle_events (wait_time.msec ()); } @@ -38,31 +42,73 @@ ACE_SUN_Proactor::handle_events (void) return this->handle_events (ACE_INFINITE); } +int ACE_SUN_Proactor::wait_for_start (ACE_Time_Value * abstime) +{ +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + + ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, -1)); + + if (num_started_aio_ != 0) // double check + return 0; + + return condition_.wait (abstime) ; + +#else + + return 0; // or -1 ??? + +#endif /* ACE_MT_SAFE */ +} + int ACE_SUN_Proactor::handle_events (u_long milli_seconds) { aio_result_t *result = 0; if (milli_seconds == ACE_INFINITE) - result = aiowait (0); + { + if (num_started_aio_ == 0) + wait_for_start (0); + + result = aiowait (0); + } else { struct timeval timeout; timeout.tv_sec = milli_seconds / 1000; timeout.tv_usec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000; + + if (num_started_aio_ == 0) + { + ACE_Time_Value tv (timeout); + + tv += ACE_OS::gettimeofday (); + + wait_for_start (&tv); + } result = aiowait (&timeout); } + if (ACE_reinterpret_cast (long, result) == 0) + return 0; // timeout + if (ACE_reinterpret_cast (long, result) == -1) + { // Check errno for EINVAL,EAGAIN,EINTR ?? - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_SUN_Proactor::handle_events:" - "aiowait failed"), - 0); - - if (ACE_reinterpret_cast (long, result) == -1) - return 0; // timeout + switch (errno) + { + case EINTR : // aiowait() was interrupted by a signal. + case EINVAL: //There are no outstanding asynchronous I/O requests. + return 0; + + default: // EFAULT + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p \nNumAIO=%d\n", + "ACE_SUN_Proactor::handle_events: aiowait failed", + num_started_aio_), + -1); + } + } int error_status = 0; int return_status = 0; @@ -96,16 +142,17 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result, return_status= 0; // we call find_completed_aio always with result != 0 - + for (ai = 0; ai < aiocb_list_max_size_; ai++) - if (result == &aiocb_list_[ai]->aio_resultp) + if (aiocb_list_[ai] !=0 && //check for non zero + result == &aiocb_list_[ai]->aio_resultp) break; - + if (ai >= aiocb_list_max_size_) // not found return 0; error_status = result->aio_errno; - return_status= result->aio_return; + return_status= result->aio_return; if (error_status == -1) // should never be { @@ -114,27 +161,44 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result, "ACE_SUN_Proactor::find_completed_aio:" "<aio_errno> has failed\n")); - aiocb_list_[ai] = 0; - result_list_[ai] = 0; - aiocb_list_cur_size_--; - - return 0; - } + return_status = 0; - if (error_status == EINPROGRESS) // should never be - return 0; + // we should notify user, otherwise : + // memory leak for result and "hanging" user + // what was before : - if (error_status == ECANCELED) - return_status = 0; + // aiocb_list_[ai] = 0; + // result_list_[ai] = 0; + // aiocb_list_cur_size_--; + // return 0; + } - if (return_status == -1) + switch (error_status) { - // was ACE_ERROR_RETURN - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_SUN_Proactor::find_completed_aio:" - "<aio_return> failed\n")); - return_status = 0; // zero bytes transferred + case EINPROGRESS : // should never be + case AIO_INPROGRESS : // according to SUN doc + return 0; + + case ECANCELED : // canceled + return_status = 0; + break; + + case 0 : // no error + if (return_status == -1) // return_status should be >= 0 + { + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_SUN_Proactor::find_completed_aio:" + "<aio_return> failed\n")); + + return_status = 0; // zero bytes transferred + } + break; + + default : // other errors + if (return_status == -1) // normal status for I/O Error + return_status = 0; // zero bytes transferred + break; } ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai]; @@ -143,12 +207,22 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result, result_list_[ai] = 0; aiocb_list_cur_size_--; + num_started_aio_ --; + + start_deferred_aio (); + //make attempt to start deferred AIO + //It is safe as we are protected by mutex_ + return asynch_result; } +// start_aio has new return codes +// 0 successful start +// 1 try later, OS queue overflow +// -1 invalid request and other errors + int -ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, - int op) +ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result) { ACE_TRACE ("ACE_SUN_Proactor::start_aio"); @@ -157,9 +231,9 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, // Start IO - switch (op) + switch (result->aio_lio_opcode) { - case 0 : + case LIO_READ : ptype = "read"; ret_val = aioread (result->aio_fildes, (char *) result->aio_buf, @@ -169,7 +243,7 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, &result->aio_resultp); break; - case 1 : + case LIO_WRITE : ptype = "write"; ret_val = aiowrite (result->aio_fildes, (char *) result->aio_buf, @@ -184,14 +258,62 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, ret_val = -1; break; } - - if (ret_val == -1) - ACE_ERROR ((LM_ERROR, + + if (ret_val == 0) + { + num_started_aio_ ++ ; + if (num_started_aio_ == 1) // wake up condition + condition_.broadcast (); + } + else // if (ret_val == -1) + { + if (errno == EAGAIN) //try later, it will be deferred AIO + ret_val = 1 ; + else + ACE_ERROR ((LM_ERROR, "%N:%l:(%P | %t)::start_aio: aio%s %p\n", ptype, "queueing failed\n")); + } + return ret_val; } -#endif /* ACE_HAS_AIO_CALLS && sun */ +int +ACE_SUN_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result) +{ + ACE_UNUSED_ARG (result); + return 2 ; // not implemented + +// AL +// I tried to implement the following code +// But result was : aiocancel returned -1 with errno=ACCESS_DENIED +// moreover, later this operation had been never finished +// on aiowait . +// Is it Sun error ?? +// +// So with SUN_Proactor there is only one way to cancel AIO +// just close the file handle. +// +// +// int rc = ::aiocancel (& result->aio_resultp); +// +// Check the return value and return 0/1/2 appropriately. +// if (rc == 0) // AIO_CANCELED +// return 0; +// +// ACE_ERROR_RETURN ((LM_ERROR, +// "%N:%l:(%P | %t)::%p\n", +// "cancel_aiocb:" +// "Unexpected result from <aiocancel>"), +// -1); +} +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Condition<ACE_Thread_Mutex>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +// These are only instantiated with ACE_HAS_THREADS. +#pragma instantiate ACE_Condition<ACE_Thread_Mutex> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + +#endif /* ACE_HAS_AIO_CALLS && sun */ diff --git a/ace/SUN_Proactor.h b/ace/SUN_Proactor.h index 5bb8feb08b9..78d2d06ee02 100644 --- a/ace/SUN_Proactor.h +++ b/ace/SUN_Proactor.h @@ -64,11 +64,9 @@ public: /// Destructor. virtual ~ACE_SUN_Proactor (void); - // @@ Alex, this shouldn't be a magic number, i.e., it should be a - // constant, such as ACE_MAX_AIO_OPERATIONS. /// Constructor defines max number asynchronous operations that can /// be started at the same time. - ACE_SUN_Proactor (size_t max_aio_operations = 512); + ACE_SUN_Proactor (size_t max_aio_operations = ACE_AIO_DEFAULT_SIZE); protected: /** @@ -96,12 +94,25 @@ protected: virtual int handle_events (void); /// From ACE_POSIX_AIOCB_Proactor. - virtual int start_aio (ACE_POSIX_Asynch_Result *result, int op); + virtual int start_aio (ACE_POSIX_Asynch_Result *result); /// Extract the results of aio. ACE_POSIX_Asynch_Result *find_completed_aio (aio_result_t *result, int &error_status, int &return_status); + + /// From ACE_POSIX_AIOCB_Proactor. + /// Attempt to cancel running request + virtual int cancel_aiocb ( ACE_POSIX_Asynch_Result * result ); + + /// Specific Sun aiowait + int wait_for_start (ACE_Time_Value * abstime); + +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + /// Condition variable . + /// used to wait the first AIO start + ACE_Condition<ACE_Thread_Mutex> condition_; +#endif /* ACE_MT_SAFE */ }; #if defined (__ACE_INLINE__) |