// $Id$ #include "ace/POSIX_Proactor.h" #if defined (ACE_HAS_AIO_CALLS) #if !defined (__ACE_INLINE__) #include "ace/POSIX_Proactor.inl" #endif /* __ACE_INLINE__ */ # if defined (ACE_HAS_SYS_SYSTEMINFO_H) # include /**/ # endif /* ACE_HAS_SYS_SYSTEMINFO_H */ #include "ace/ACE.h" #include "ace/Flag_Manip.h" #include "ace/Task_T.h" #include "ace/Log_Msg.h" #include "ace/Object_Manager.h" #include "ace/OS_NS_sys_socket.h" #include "ace/OS_NS_signal.h" #include "ace/OS_NS_unistd.h" #if defined (sun) # include "ace/OS_NS_strings.h" #endif /* sun */ // ********************************************************************* ACE_BEGIN_VERSIONED_NAMESPACE_DECL /** * @class ACE_POSIX_Wakeup_Completion * * This result object is used by the of the * ACE_Proactor interface to wake up all the threads blocking * for completions. */ class ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result { public: /// Constructor. ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr &handler_proxy, const void *act = 0, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Destructor. virtual ~ACE_POSIX_Wakeup_Completion (void); /// This method calls the 's method. virtual void complete (size_t bytes_transferred = 0, int success = 1, const void *completion_key = 0, u_long error = 0); }; // ********************************************************************* ACE_POSIX_Proactor::ACE_POSIX_Proactor (void) : os_id_ (ACE_OS_UNDEFINED) { #if defined(sun) os_id_ = ACE_OS_SUN; // set family char Buf [32]; ::memset(Buf,0,sizeof(Buf)); ACE_OS::sysinfo (SI_RELEASE , Buf, sizeof(Buf)-1); if (ACE_OS::strcasecmp (Buf , "5.6") == 0) os_id_ = ACE_OS_SUN_56; else if (ACE_OS::strcasecmp (Buf , "5.7") == 0) os_id_ = ACE_OS_SUN_57; else if (ACE_OS::strcasecmp (Buf , "5.8") == 0) os_id_ = ACE_OS_SUN_58; #elif defined(HPUX) os_id_ = ACE_OS_HPUX; // set family #elif defined(__sgi) os_id_ = ACE_OS_IRIX; // set family #elif defined(__OpenBSD) os_id_ = ACE_OS_OPENBSD; // set family // do the same //#else defined (LINUX, __FreeBSD__ ...) //setup here os_id_ #endif } ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void) { this->close (); } int ACE_POSIX_Proactor::close (void) { return 0; } int ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle, const void *completion_key) { ACE_UNUSED_ARG (handle); ACE_UNUSED_ARG (completion_key); return 0; } int ACE_POSIX_Proactor::wake_up_dispatch_threads (void) { return 0; } int ACE_POSIX_Proactor::close_dispatch_threads (int) { return 0; } size_t ACE_POSIX_Proactor::number_of_threads (void) const { // @@ Implement it. ACE_NOTSUP_RETURN (0); } void ACE_POSIX_Proactor::number_of_threads (size_t threads) { // @@ Implement it. ACE_UNUSED_ARG (threads); } ACE_HANDLE ACE_POSIX_Proactor::get_handle (void) const { return ACE_INVALID_HANDLE; } ACE_Asynch_Read_Stream_Impl * ACE_POSIX_Proactor::create_asynch_read_stream (void) { ACE_Asynch_Read_Stream_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Read_Stream (this), 0); return implementation; } ACE_Asynch_Read_Stream_Result_Impl * ACE_POSIX_Proactor::create_asynch_read_stream_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE handle, ACE_Message_Block &message_block, size_t bytes_to_read, const void* act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Read_Stream_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy, handle, message_block, bytes_to_read, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Write_Stream_Impl * ACE_POSIX_Proactor::create_asynch_write_stream (void) { ACE_Asynch_Write_Stream_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Write_Stream (this), 0); return implementation; } ACE_Asynch_Write_Stream_Result_Impl * ACE_POSIX_Proactor::create_asynch_write_stream_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE handle, ACE_Message_Block &message_block, size_t bytes_to_write, const void* act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Write_Stream_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy, handle, message_block, bytes_to_write, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Read_File_Impl * ACE_POSIX_Proactor::create_asynch_read_file (void) { ACE_Asynch_Read_File_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Read_File (this), 0); return implementation; } ACE_Asynch_Read_File_Result_Impl * ACE_POSIX_Proactor::create_asynch_read_file_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE handle, ACE_Message_Block &message_block, size_t bytes_to_read, const void* act, u_long offset, u_long offset_high, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Read_File_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Read_File_Result (handler_proxy, handle, message_block, bytes_to_read, act, offset, offset_high, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Write_File_Impl * ACE_POSIX_Proactor::create_asynch_write_file (void) { ACE_Asynch_Write_File_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Write_File (this), 0); return implementation; } ACE_Asynch_Write_File_Result_Impl * ACE_POSIX_Proactor::create_asynch_write_file_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE handle, ACE_Message_Block &message_block, size_t bytes_to_write, const void* act, u_long offset, u_long offset_high, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Write_File_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Write_File_Result (handler_proxy, handle, message_block, bytes_to_write, act, offset, offset_high, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Read_Dgram_Impl * ACE_POSIX_Proactor::create_asynch_read_dgram (void) { ACE_Asynch_Read_Dgram_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Read_Dgram (this), 0); return implementation; } ACE_Asynch_Read_Dgram_Result_Impl * ACE_POSIX_Proactor::create_asynch_read_dgram_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE handle, ACE_Message_Block *message_block, size_t bytes_to_read, int flags, int protocol_family, const void* act, ACE_HANDLE event , int priority , int signal_number) { ACE_Asynch_Read_Dgram_Result_Impl *implementation=0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Read_Dgram_Result(handler_proxy, handle, message_block, bytes_to_read, flags, protocol_family, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Write_Dgram_Impl * ACE_POSIX_Proactor::create_asynch_write_dgram (void) { ACE_Asynch_Write_Dgram_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Write_Dgram (this), 0); return implementation; } ACE_Asynch_Write_Dgram_Result_Impl * ACE_POSIX_Proactor::create_asynch_write_dgram_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE handle, ACE_Message_Block *message_block, size_t bytes_to_write, int flags, const void* act, ACE_HANDLE event, int priority , int signal_number) { ACE_Asynch_Write_Dgram_Result_Impl *implementation=0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Write_Dgram_Result(handler_proxy, handle, message_block, bytes_to_write, flags, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Accept_Impl * ACE_POSIX_Proactor::create_asynch_accept (void) { ACE_Asynch_Accept_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Accept (this), 0); return implementation; } ACE_Asynch_Accept_Result_Impl * ACE_POSIX_Proactor::create_asynch_accept_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE listen_handle, ACE_HANDLE accept_handle, ACE_Message_Block &message_block, size_t bytes_to_read, const void* act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Accept_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Accept_Result (handler_proxy, listen_handle, accept_handle, message_block, bytes_to_read, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Connect_Impl * ACE_POSIX_Proactor::create_asynch_connect (void) { ACE_Asynch_Connect_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Connect (this), 0); return implementation; } ACE_Asynch_Connect_Result_Impl * ACE_POSIX_Proactor::create_asynch_connect_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE connect_handle, const void* act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Connect_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Connect_Result (handler_proxy, connect_handle, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Transmit_File_Impl * ACE_POSIX_Proactor::create_asynch_transmit_file (void) { ACE_Asynch_Transmit_File_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Transmit_File (this), 0); return implementation; } ACE_Asynch_Transmit_File_Result_Impl * ACE_POSIX_Proactor::create_asynch_transmit_file_result (const ACE_Handler::Proxy_Ptr &handler_proxy, ACE_HANDLE socket, ACE_HANDLE file, ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer, size_t bytes_to_write, u_long offset, u_long offset_high, size_t bytes_per_send, u_long flags, const void *act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Transmit_File_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Transmit_File_Result (handler_proxy, socket, file, header_and_trailer, bytes_to_write, offset, offset_high, bytes_per_send, flags, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Result_Impl * ACE_POSIX_Proactor::create_asynch_timer (const ACE_Handler::Proxy_Ptr &handler_proxy, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event, int priority, int signal_number) { ACE_POSIX_Asynch_Timer *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Timer (handler_proxy, act, tv, event, priority, signal_number), 0); return implementation; } #if 0 int ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) { // Perform a non-blocking "poll" for all the I/O events that have // completed in the I/O completion queue. ACE_Time_Value timeout (0, 0); int result = 0; for (;;) { result = this->handle_events (timeout); if (result != 0 || errno == ETIME) break; } // If our handle_events failed, we'll report a failure to the // Reactor. return result == -1 ? -1 : 0; } int ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { ACE_UNUSED_ARG (close_mask); ACE_UNUSED_ARG (handle); return this->close (); } #endif /* 0 */ void ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result, size_t bytes_transferred, const void */* completion_key*/, u_long error) { ACE_SEH_TRY { // Call completion hook asynch_result->complete (bytes_transferred, error ? 0 : 1, 0, // No completion key. error); } ACE_SEH_FINALLY { // This is crucial to prevent memory leaks delete asynch_result; } } int ACE_POSIX_Proactor::post_wakeup_completions (int how_many) { ACE_POSIX_Wakeup_Completion *wakeup_completion = 0; for (int ci = 0; ci < how_many; ci++) { ACE_NEW_RETURN (wakeup_completion, ACE_POSIX_Wakeup_Completion (this->wakeup_handler_.proxy ()), -1); if (this->post_completion (wakeup_completion) == -1) return -1; } return 0; } ACE_POSIX_Proactor::Proactor_Type ACE_POSIX_Proactor::get_impl_type (void) { return PROACTOR_POSIX; } /** * @class ACE_AIOCB_Notify_Pipe_Manager * * @brief This class manages the notify pipe of the AIOCB Proactor. * * This class acts as the Handler for the * operations issued on the notify pipe. This * class is very useful in implementing operation * class for the . This is also useful for * implementing for . * class issues a on * the pipe, using this class as the * Handler. 's are sent through the * notify pipe. When 's show up on the * notify pipe, the dispatches the * completion of the and calls the * of this class. This class calls * on the and thus calls the * application handler. * Handling the MessageBlock: * We give this message block to read the result pointer through * the notify pipe. We expect that to read 4 bytes from the * notify pipe, for each call. Before giving this * message block to another , we update and put * it in its initial position. */ class ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler { public: /// Constructor. You need the posix proactor because you need to call /// ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor); /// Destructor. virtual ~ACE_AIOCB_Notify_Pipe_Manager (void); /// Send the result pointer through the notification pipe. int notify (); /// This is the call back method when from the pipe is /// complete. virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); private: /// The implementation proactor class. ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_; /// Message block to get ACE_POSIX_Asynch_Result pointer from the pipe. ACE_Message_Block message_block_; /// Pipe for the communication between Proactor and the /// Asynch_Accept/Asynch_Connect and other post_completions ACE_Pipe pipe_; /// To do asynch_read on the pipe. ACE_POSIX_Asynch_Read_Stream read_stream_; /// Default constructor. Shouldnt be called. ACE_AIOCB_Notify_Pipe_Manager (void); }; ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : posix_aiocb_proactor_ (posix_aiocb_proactor), message_block_ (sizeof (2)), read_stream_ (posix_aiocb_proactor) { // Open the pipe. this->pipe_.open (); // Set write side in NONBLOCK mode ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK); // Set read side in BLOCK mode ACE::clr_flags (this->pipe_.read_handle (), ACE_NONBLOCK); // Let AIOCB_Proactor know about our handle posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ()); // Open the read stream. if (this->read_stream_.open (this->proxy (), this->pipe_.read_handle (), 0, // Completion Key 0) // Proactor == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT("%N:%l:%p\n"), ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:") ACE_TEXT("Open on Read Stream failed"))); // Issue an asynch_read on the read_stream of the notify pipe. if (this->read_stream_.read (this->message_block_, 1, // enough to read 1 byte 0, // ACT 0) // Priority == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT("%N:%l:%p\n"), ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:") ACE_TEXT("Read from pipe failed"))); } ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void) { // 1. try to cancel pending aio this->read_stream_.cancel (); // 2. close both handles // Destuctor of ACE_Pipe does not close handles. // We can not use ACE_Pipe::close() as it // closes read_handle and than write_handle. // In some systems close() may wait for // completion for all asynch. pending requests. // So we should close write_handle firstly // to force read completion ( if 1. does not help ) // and then read_handle and not vice versa ACE_HANDLE h = this->pipe_.write_handle (); if (h != ACE_INVALID_HANDLE) ACE_OS::closesocket (h); h = this->pipe_.read_handle (); if ( h != ACE_INVALID_HANDLE) ACE_OS::closesocket (h); } int ACE_AIOCB_Notify_Pipe_Manager::notify () { // Send the result pointer through the pipe. char char_send = 0; ssize_t ret_val = ACE::send (this->pipe_.write_handle (), &char_send, sizeof (char_send)); if (ret_val < 0) { if (errno != EWOULDBLOCK) #if 0 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P %t):%p\n"), ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify") ACE_TEXT ("Error:Writing on to notify pipe failed"))); #endif /* 0 */ return -1; } return 0; } void ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream::Result & /*result*/) { // 1. Start new read to avoid pipe overflow // Set the message block properly. Put the back in the // initial position. if (this->message_block_.length () > 0) this->message_block_.wr_ptr (this->message_block_.rd_ptr ()); // One accept has completed. Issue a read to handle any // s in the future. if (-1 == this->read_stream_.read (this->message_block_, 1, // enough to read 1 byte 0, // ACT 0)) // Priority ACE_ERROR ((LM_ERROR, ACE_TEXT ("%N:%l:(%P | %t):%p\n"), ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:") ACE_TEXT ("Read from pipe failed"))); // 2. Do the upcalls // this->posix_aiocb_proactor_->process_result_queue (); } // Public constructor for common use. ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) : aiocb_notify_pipe_manager_ (0), aiocb_list_ (0), result_list_ (0), aiocb_list_max_size_ (max_aio_operations), aiocb_list_cur_size_ (0), notify_pipe_read_handle_ (ACE_INVALID_HANDLE), num_deferred_aiocb_ (0), num_started_aio_ (0) { // Check for correct value for max_aio_operations check_max_aio_num (); this->create_result_aiocb_list (); this->create_notify_manager (); // start pseudo-asynchronous accept task // one per all future acceptors this->get_asynch_pseudo_task().start (); } // Special protected constructor for ACE_SUN_Proactor ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, ACE_POSIX_Proactor::Proactor_Type) : aiocb_notify_pipe_manager_ (0), aiocb_list_ (0), result_list_ (0), aiocb_list_max_size_ (max_aio_operations), aiocb_list_cur_size_ (0), notify_pipe_read_handle_ (ACE_INVALID_HANDLE), num_deferred_aiocb_ (0), num_started_aio_ (0) { //check for correct value for max_aio_operations this->check_max_aio_num (); this->create_result_aiocb_list (); // @@ We should create Notify_Pipe_Manager in the derived class to // provide correct calls for virtual functions !!! } // Destructor. ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) { this->close(); } ACE_POSIX_Proactor::Proactor_Type ACE_POSIX_AIOCB_Proactor::get_impl_type (void) { return PROACTOR_AIOCB; } int ACE_POSIX_AIOCB_Proactor::close (void) { // stop asynch accept task this->get_asynch_pseudo_task().stop (); this->delete_notify_manager (); this->clear_result_queue (); return this->delete_result_aiocb_list (); } void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h) { notify_pipe_read_handle_ = h; } int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void) { if (aiocb_list_ != 0) return 0; ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1); ACE_NEW_RETURN (result_list_, ACE_POSIX_Asynch_Result *[aiocb_list_max_size_], -1); // Initialize the array. for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) { aiocb_list_[ai] = 0; result_list_[ai] = 0; } return 0; } int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void) { if (aiocb_list_ == 0) // already deleted return 0; size_t ai; // Try to cancel all uncompleted operations; POSIX systems may have // hidden system threads that still can work with our aiocbs! for (ai = 0; ai < aiocb_list_max_size_; ai++) if (this->aiocb_list_[ai] != 0) // active operation this->cancel_aiocb (result_list_[ai]); int num_pending = 0; for (ai = 0; ai < aiocb_list_max_size_; ai++) { if (this->aiocb_list_[ai] == 0 ) // not active operation continue; // Get the error and return status of the aio_ operation. int error_status = 0; size_t transfer_count = 0; int flg_completed = this->get_result_status (result_list_[ai], error_status, transfer_count); //don't delete uncompleted AIOCB's if (flg_completed == 0) // not completed !!! { num_pending++; #if 0 char * errtxt = ACE_OS::strerror (error_status); if (errtxt == 0) errtxt ="?????????"; char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )? "WRITE":"READ" ; ACE_ERROR ((LM_ERROR, ACE_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"), ai, op, error_status, transfer_count, errtxt)); #endif /* 0 */ } else // completed , OK { delete this->result_list_[ai]; this->result_list_[ai] = 0; this->aiocb_list_[ai] = 0; } } // If it is not possible cancel some operation (num_pending > 0 ), // we can do only one thing -report about this // and complain about POSIX implementation. // We know that we have memory leaks, but it is better than // segmentation fault! ACE_DEBUG ((LM_DEBUG, ACE_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n") ACE_TEXT(" number pending AIO=%d\n"), num_pending)); delete [] this->aiocb_list_; this->aiocb_list_ = 0; delete [] this->result_list_; this->result_list_ = 0; return (num_pending == 0 ? 0 : -1); // ?? or just always return 0; } void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () { long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX); // Define max limit AIO's for concrete OS // -1 means that there is no limit, but it is not true // (example, SunOS 5.6) if (max_os_aio_num > 0 && aiocb_list_max_size_ > (unsigned long) max_os_aio_num) aiocb_list_max_size_ = max_os_aio_num; #if defined (HPUX) || defined (__FreeBSD__) // Although HPUX 11.00 allows to start 2048 AIO's for all process in // system it has a limit 256 max elements for aio_suspend () It is a // pity, but ... long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX); if (max_os_listio_num > 0 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num) aiocb_list_max_size_ = max_os_listio_num; #endif /* HPUX || __FreeBSD__ */ // check for user-defined value // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h if (aiocb_list_max_size_ <= 0 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE) aiocb_list_max_size_ = ACE_AIO_MAX_SIZE; // check for max number files to open int max_num_files = ACE::max_handles (); if (max_num_files > 0 && aiocb_list_max_size_ > (unsigned long) max_num_files) { ACE::set_handle_limit (aiocb_list_max_size_); max_num_files = ACE::max_handles (); } if (max_num_files > 0 && aiocb_list_max_size_ > (unsigned long) max_num_files) aiocb_list_max_size_ = (unsigned long) max_num_files; ACE_DEBUG ((LM_DEBUG, "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n", aiocb_list_max_size_)); #if defined(__sgi) ACE_DEBUG((LM_DEBUG, ACE_TEXT( "SGI IRIX specific: aio_init!\n"))); //typedef struct aioinit { // int aio_threads; /* The number of aio threads to start (5) */ // int aio_locks; /* Initial number of preallocated locks (3) */ // int aio_num; /* estimated total simultanious aiobc structs (1000) */ // int aio_usedba; /* Try to use DBA for raw I/O in lio_listio (0) */ // int aio_debug; /* turn on debugging (0) */ // int aio_numusers; /* max number of user sprocs making aio_* calls (5) */ // int aio_reserved[3]; //} aioinit_t; aioinit_t aioinit; aioinit.aio_threads = 10; /* The number of aio threads to start (5) */ aioinit.aio_locks = 20; /* Initial number of preallocated locks (3) */ /* estimated total simultaneous aiobc structs (1000) */ aioinit.aio_num = aiocb_list_max_size_; aioinit.aio_usedba = 0; /* Try to use DBA for raw IO in lio_listio (0) */ aioinit.aio_debug = 0; /* turn on debugging (0) */ aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */ aioinit.aio_reserved[0] = 0; aioinit.aio_reserved[1] = 0; aioinit.aio_reserved[2] = 0; aio_sgi_init (&aioinit); #endif return; } void ACE_POSIX_AIOCB_Proactor::create_notify_manager (void) { // Remember! this issues a Asynch_Read // on the notify pipe for doing the Asynch_Accept/Connect. if (aiocb_notify_pipe_manager_ == 0) ACE_NEW (aiocb_notify_pipe_manager_, ACE_AIOCB_Notify_Pipe_Manager (this)); } void ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void) { // We are responsible for delete as all pointers set to 0 after // delete, it is save to delete twice delete aiocb_notify_pipe_manager_; aiocb_notify_pipe_manager_ = 0; } int ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time) { // Decrement with the amount of time spent in the method ACE_Countdown_Time countdown (&wait_time); return this->handle_events_i (wait_time.msec ()); } int ACE_POSIX_AIOCB_Proactor::handle_events (void) { return this->handle_events_i (ACE_INFINITE); } int ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num) { ACE_UNUSED_ARG (sig_num); return this->aiocb_notify_pipe_manager_->notify (); } int ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1)); int ret_val = this->putq_result (result); return ret_val; } int ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result) { // this protected method should be called with locked mutex_ // we can't use GUARD as Proactor uses non-recursive mutex if (!result) return -1; int sig_num = result->signal_number (); int ret_val = this->result_queue_.enqueue_tail (result); if (ret_val == -1) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"), -1); this->notify_completion (sig_num); return 0; } ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result (void) { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0)); ACE_POSIX_Asynch_Result* result = 0; if (this->result_queue_.dequeue_head (result) != 0) return 0; // don't waste time if queue is empty - it is normal // or check queue size before dequeue_head // ACE_ERROR_RETURN ((LM_ERROR, // ACE_TEXT("%N:%l:(%P | %t):%p\n"), // ACE_TEXT("ACE_POSIX_AIOCB_Proactor::getq_result failed")), // 0); return result; } int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void) { int ret_val = 0; ACE_POSIX_Asynch_Result* result = 0; while ((result = this->getq_result ()) != 0) { delete result; ret_val++; } return ret_val; } int ACE_POSIX_AIOCB_Proactor::process_result_queue (void) { int ret_val = 0; ACE_POSIX_Asynch_Result* result = 0; while ((result = this->getq_result ()) != 0) { this->application_specific_code (result, result->bytes_transferred(), // 0, No bytes transferred. 0, // No completion key. result->error()); //0, No error. ret_val++; } return ret_val; } int ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds) { int result_suspend = 0; int retval= 0; if (milli_seconds == ACE_INFINITE) // Indefinite blocking. result_suspend = aio_suspend (aiocb_list_, aiocb_list_max_size_, 0); else { // Block on for timespec timeout; timeout.tv_sec = milli_seconds / 1000; timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000; result_suspend = aio_suspend (aiocb_list_, aiocb_list_max_size_, &timeout); } // Check for errors if (result_suspend == -1) { if (errno != EAGAIN && // Timeout errno != EINTR ) // Interrupted call ACE_ERROR ((LM_ERROR, ACE_TEXT ("%N:%l:(%P|%t)::%p\n"), ACE_TEXT ("handle_events: aio_suspend failed"))); // let continue work // we should check "post_completed" queue } else { size_t index = 0; size_t count = aiocb_list_max_size_; // max number to iterate int error_status = 0; size_t transfer_count = 0; for (;; retval++) { ACE_POSIX_Asynch_Result *asynch_result = find_completed_aio (error_status, transfer_count, index, count); if (asynch_result == 0) break; // Call the application code. this->application_specific_code (asynch_result, transfer_count, 0, // No completion key. error_status); } } // process post_completed results retval += this->process_result_queue (); return retval > 0 ? 1 : 0; } int ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result, int &error_status, size_t &transfer_count) { transfer_count = 0; // Get the error status of the aio_ operation. error_status = aio_error (asynch_result); if (error_status == EINPROGRESS) return 0; // not completed ssize_t op_return = aio_return (asynch_result); if (op_return > 0) transfer_count = static_cast (op_return); // else transfer_count is already 0, error_status reports the error. return 1; // completed } ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, size_t &transfer_count, size_t &index, size_t &count) { // parameter index defines initial slot to scan // parameter count tells us how many slots should we scan ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0)); ACE_POSIX_Asynch_Result *asynch_result = 0; if (num_started_aio_ == 0) // save time return 0; for (; count > 0; index++ , count--) { if (index >= aiocb_list_max_size_) // like a wheel index = 0; if (aiocb_list_[index] == 0) // Dont process null blocks. continue; if (0 != this->get_result_status (result_list_[index], error_status, transfer_count)) // completed break; } // end for if (count == 0) // all processed , nothing found return 0; asynch_result = result_list_[index]; aiocb_list_[index] = 0; result_list_[index] = 0; aiocb_list_cur_size_--; num_started_aio_--; // decrement count active aios index++; // for next iteration count--; // for next iteration this->start_deferred_aio (); //make attempt to start deferred AIO //It is safe as we are protected by mutex_ return asynch_result; } int ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, ACE_POSIX_Proactor::Opcode op) { ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio"); ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0; if (result == 0) // Just check the status of the list return ret_val; // Save operation code in the aiocb switch (op) { case ACE_POSIX_Proactor::ACE_OPCODE_READ: result->aio_lio_opcode = LIO_READ; break; case ACE_POSIX_Proactor::ACE_OPCODE_WRITE: result->aio_lio_opcode = LIO_WRITE; break; default: ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%N:%l:(%P|%t)::") ACE_TEXT ("start_aio: Invalid op code %d\n"), op), -1); } if (ret_val != 0) // No free slot { errno = EAGAIN; return -1; } // Find a free slot and store. ssize_t slot = allocate_aio_slot (result); if (slot < 0) return -1; size_t index = static_cast (slot); result_list_[index] = result; //Store result ptr anyway aiocb_list_cur_size_++; ret_val = start_aio_i (result); switch (ret_val) { case 0: // started OK aiocb_list_[index] = result; return 0; case 1: // OS AIO queue overflow num_deferred_aiocb_ ++; return 0; default: // Invalid request, there is no point break; // to start it later } result_list_[index] = 0; aiocb_list_cur_size_--; return -1; } ssize_t ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) { size_t i = 0; // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager // so make check for ACE_AIOCB_Notify_Pipe_Manager request if (notify_pipe_read_handle_ == result->aio_fildes) // Notify_Pipe ? { // should be free, if (result_list_[i] != 0) // only 1 request { // is allowed errno = EAGAIN; ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:" "internal Proactor error 0\n"), -1); } } else //try to find free slot as usual, but starting from 1 { for (i= 1; i < this->aiocb_list_max_size_; i++) if (result_list_[i] == 0) break; } if (i >= this->aiocb_list_max_size_) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:" "internal Proactor error 1\n"), -1); //setup OS notification methods for this aio result->aio_sigevent.sigev_notify = SIGEV_NONE; return static_cast (i); } // start_aio_i has new return codes // 0 AIO was started successfully // 1 AIO was not started, OS AIO queue overflow // -1 AIO was not started, other errors int ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result) { ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i"); int ret_val; const ACE_TCHAR *ptype = 0; // Start IO switch (result->aio_lio_opcode ) { case LIO_READ : ptype = ACE_TEXT ("read "); ret_val = aio_read (result); break; case LIO_WRITE : ptype = ACE_TEXT ("write"); ret_val = aio_write (result); break; default: ptype = ACE_TEXT ("?????"); ret_val = -1; break; } if (ret_val == 0) { ++this->num_started_aio_; } else // if (ret_val == -1) { if (errno == EAGAIN || errno == ENOMEM) //Ok, it will be deferred AIO ret_val = 1; else ACE_ERROR ((LM_ERROR, ACE_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"), ptype, ACE_TEXT ("queueing failed"))); } return ret_val; } int ACE_POSIX_AIOCB_Proactor::start_deferred_aio () { ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio"); // This protected method is called from // find_completed_aio after any AIO completion // We should call this method always with locked // ACE_POSIX_AIOCB_Proactor::mutex_ // // It tries to start the first deferred AIO // if such exists if (num_deferred_aiocb_ == 0) return 0; // nothing to do size_t i = 0; for (i= 0; i < this->aiocb_list_max_size_; i++) if (result_list_[i] !=0 // check for && aiocb_list_[i] ==0) // deferred AIO break; if (i >= this->aiocb_list_max_size_) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" "start_deferred_aio:" "internal Proactor error 3\n"), -1); ACE_POSIX_Asynch_Result *result = result_list_[i]; int ret_val = start_aio_i (result); switch (ret_val) { case 0 : //started OK , decrement count of deferred AIOs aiocb_list_[i] = result; num_deferred_aiocb_ --; return 0; case 1 : return 0; //try again later default : // Invalid Parameters , should never be break; } //AL notify user result_list_[i] = 0; --aiocb_list_cur_size_; --num_deferred_aiocb_; result->set_error (errno); result->set_bytes_transferred (0); this->putq_result (result); // we are with locked mutex_ here ! return -1; } int ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle) { // This new method should be called from // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel // It scans the result_list_ and defines all AIO requests // that were issued for handle "handle" // // For all deferred AIO requests with handle "handle" // it removes its from the lists and notifies user // // For all running AIO requests with handle "handle" // it calls ::aio_cancel. According to the POSIX standards // we will receive ECANCELED for all ::aio_canceled AIO requests // later on return from ::aio_suspend ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio"); int num_total = 0; int num_cancelled = 0; { ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); size_t ai = 0; for (ai = 0; ai < this->aiocb_list_max_size_; ai++) { if (this->result_list_[ai] == 0) // Skip empty slot continue; if (this->result_list_[ai]->aio_fildes != handle) // Not ours continue; ++num_total; ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai]; if (this->aiocb_list_[ai] == 0) // Canceling a deferred operation { num_cancelled++; this->num_deferred_aiocb_--; this->aiocb_list_[ai] = 0; this->result_list_[ai] = 0; this->aiocb_list_cur_size_--; asynch_result->set_error (ECANCELED); asynch_result->set_bytes_transferred (0); this->putq_result (asynch_result); // we are with locked mutex_ here ! } else // Cancel started aio { int rc_cancel = this->cancel_aiocb (asynch_result); if (rc_cancel == 0) //notification in the future num_cancelled++; //it is OS responsiblity } } } // release mutex_ if (num_total == 0) return 1; // ALLDONE if (num_cancelled == num_total) return 0; // CANCELLED return 2; // NOT CANCELLED } int ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result) { // This method is called from cancel_aio // to cancel a previously submitted AIO request int rc = ::aio_cancel (0, result); // Check the return value and return 0/1/2 appropriately. if (rc == AIO_CANCELED) return 0; else if (rc == AIO_ALLDONE) return 1; else // (rc == AIO_NOTCANCELED) return 2; } // ********************************************************************* #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS) ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations) : ACE_POSIX_AIOCB_Proactor (max_aio_operations, ACE_POSIX_Proactor::PROACTOR_SIG) { // = Set up the mask we'll use to block waiting for SIGRTMIN. Use that // to add it to the signal mask for this thread, and also set the process // signal action to pass signal information when we want it. // Clear the signal set. ACE_OS::sigemptyset (&this->RT_completion_signals_); // Add the signal number to the signal set. if (ACE_OS::sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"), ACE_TEXT ("sigaddset"))); this->block_signals (); // Set up the signal action for SIGRTMIN. this->setup_signal_handler (ACE_SIGRTMIN); // we do not have to create notify manager // but we should start pseudo-asynchronous accept task // one per all future acceptors this->get_asynch_pseudo_task().start (); return; } ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set, size_t max_aio_operations) : ACE_POSIX_AIOCB_Proactor (max_aio_operations, ACE_POSIX_Proactor::PROACTOR_SIG) { // = Keep with the Proactor, mask all the signals and // setup signal actions for the signals in the . // = Keep with the Proactor. // Empty the signal set first. if (sigemptyset (&this->RT_completion_signals_) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT("Error:(%P | %t):%p\n"), ACE_TEXT("sigemptyset failed"))); // For each signal number present in the , add it to // the signal set we use, and also set up its process signal action // to allow signal info to be passed into sigwait/sigtimedwait. int member = 0; for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++) { member = sigismember (&signal_set, si); if (member == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT("%N:%l:(%P | %t)::%p\n"), ACE_TEXT("ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:") ACE_TEXT("sigismember failed"))); else if (member == 1) { sigaddset (&this->RT_completion_signals_, si); this->setup_signal_handler (si); } } // Mask all the signals. this->block_signals (); // we do not have to create notify manager // but we should start pseudo-asynchronous accept task // one per all future acceptors this->get_asynch_pseudo_task().start (); return; } ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void) { this->close (); // @@ Enable the masked signals again. } ACE_POSIX_Proactor::Proactor_Type ACE_POSIX_SIG_Proactor::get_impl_type (void) { return PROACTOR_SIG; } int ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time) { // Decrement with the amount of time spent in the method ACE_Countdown_Time countdown (&wait_time); return this->handle_events_i (&wait_time); } int ACE_POSIX_SIG_Proactor::handle_events (void) { return this->handle_events_i (0); } int ACE_POSIX_SIG_Proactor::notify_completion (int sig_num) { // Get this process id. pid_t const pid = ACE_OS::getpid (); if (pid == (pid_t) -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT("Error:%N:%l(%P | %t):%p"), ACE_TEXT(" failed")), -1); // Set the signal information. sigval value; #if defined (ACE_HAS_SIGVAL_SIGVAL_INT) value.sigval_int = -1; #else value.sival_int = -1; #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */ // Queue the signal. if (sigqueue (pid, sig_num, value) == 0) return 0; if (errno != EAGAIN) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT("Error:%N:%l:(%P | %t):%p\n"), ACE_TEXT(" failed")), -1); return -1; } ACE_Asynch_Result_Impl * ACE_POSIX_SIG_Proactor::create_asynch_timer (const ACE_Handler::Proxy_Ptr &handler_proxy, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event, int priority, int signal_number) { int is_member = 0; // Fix the signal number. if (signal_number == -1) { int si; for (si = ACE_SIGRTMAX; (is_member == 0) && (si >= ACE_SIGRTMIN); si--) { is_member = sigismember (&this->RT_completion_signals_, si); if (is_member == -1) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::%s\n", "ACE_POSIX_SIG_Proactor::create_asynch_timer:" "sigismember failed"), 0); } if (is_member == 0) ACE_ERROR_RETURN ((LM_ERROR, "Error:%N:%l:(%P | %t)::%s\n", "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:" "Signal mask set empty"), 0); else // + 1 to nullify loop increment. signal_number = si + 1; } ACE_Asynch_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Timer (handler_proxy, act, tv, event, priority, signal_number), 0); return implementation; } #if 0 static void sig_handler (int sig_num, siginfo_t *, ucontext_t *) { // Should never be called ACE_DEBUG ((LM_DEBUG, "%N:%l:(%P | %t)::sig_handler received signal: %d\n", sig_num)); } #endif /*if 0*/ int ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const { // Set up the specified signal so that signal information will be // passed to sigwaitinfo/sigtimedwait. Don't change the default // signal handler - having a handler and waiting for the signal can // produce undefined behavior. // But can not use SIG_DFL // With SIG_DFL after delivering the first signal // SIG_DFL handler resets SA_SIGINFO flags // and we will lose all information sig_info // At least all SunOS have such behavior #if 0 struct sigaction reaction; sigemptyset (&reaction.sa_mask); // Nothing else to mask. reaction.sa_flags = SA_SIGINFO; // Realtime flag. reaction.sa_sigaction = ACE_SIGNAL_C_FUNC (sig_handler); // (SIG_DFL); int sigaction_return = ACE_OS::sigaction (signal_number, &reaction, 0); if (sigaction_return == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT("Error:%p\n"), ACE_TEXT("Proactor couldnt do sigaction for the RT SIGNAL")), -1); #else ACE_UNUSED_ARG(signal_number); #endif return 0; } int ACE_POSIX_SIG_Proactor::block_signals (void) const { return ACE_OS::pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0); } ssize_t ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) { size_t i = 0; //try to find free slot as usual, starting from 0 for (i = 0; i < this->aiocb_list_max_size_; i++) if (result_list_[i] == 0) break; if (i >= this->aiocb_list_max_size_) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" "ACE_POSIX_SIG_Proactor::allocate_aio_slot " "internal Proactor error 1\n"), -1); // setup OS notification methods for this aio // store index!!, not pointer in signal info result->aio_sigevent.sigev_notify = SIGEV_SIGNAL; result->aio_sigevent.sigev_signo = result->signal_number (); #if defined (ACE_HAS_SIGVAL_SIGVAL_INT) result->aio_sigevent.sigev_value.sigval_int = static_cast (i); #else result->aio_sigevent.sigev_value.sival_int = static_cast (i); #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */ return static_cast (i); } int ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value *timeout) { int result_sigwait = 0; siginfo_t sig_info; do { // Wait for the signals. if (timeout == 0) { result_sigwait = ACE_OS::sigwaitinfo (&this->RT_completion_signals_, &sig_info); } else { result_sigwait = ACE_OS::sigtimedwait (&this->RT_completion_signals_, &sig_info, timeout); if (result_sigwait == -1 && errno == EAGAIN) return 0; } } while (result_sigwait == -1 && errno == EINTR); if (result_sigwait == -1) // Not a timeout, not EINTR: tell caller of error return -1; // Decide what to do. We always check the completion queue since it's an // easy, quick check. What is decided here is whether to check for // I/O completions and, if so, how completely to scan. int flg_aio = 0; // 1 if AIO Completion possible size_t index = 0; // start index to scan aiocb list size_t count = 1; // max number of aiocbs to scan int error_status = 0; size_t transfer_count = 0; if (sig_info.si_code == SI_ASYNCIO || this->os_id_ == ACE_OS_SUN_56) { flg_aio = 1; // AIO signal received // define index to start // nothing will happen if it contains garbage #if defined (ACE_HAS_SIGVAL_SIGVAL_INT) index = static_cast (sig_info.si_value.sigval_int); #else index = static_cast (sig_info.si_value.sival_int); #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */ // Assume we have a correctly-functioning implementation, and that // there is one I/O to process, and it's correctly specified in the // siginfo received. There are, however, some special situations // where this isn't true... if (os_id_ == ACE_OS_SUN_56) // Solaris 6 { // 1. Solaris 6 always loses any RT signal, // if it has more SIGQUEMAX=32 pending signals // so we should scan the whole aiocb list // 2. Moreover,it has one more bad habit // to notify aio completion // with SI_QUEUE code instead of SI_ASYNCIO, hence the // OS_SUN_56 addition to the si_code check, above. count = aiocb_list_max_size_; } } else if (sig_info.si_code != SI_QUEUE) { // Unknown signal code. // may some other third-party libraries could send it // or message queue could also generate it ! // So print the message and check our completions ACE_ERROR ((LM_DEBUG, ACE_TEXT ("%N:%l:(%P | %t): ") ACE_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ") ACE_TEXT ("Unexpected signal code (%d) returned ") ACE_TEXT ("from sigwait; expecting %d\n"), result_sigwait, sig_info.si_code)); flg_aio = 1; } int ret_aio = 0; int ret_que = 0; if (flg_aio) for (;; ret_aio++) { ACE_POSIX_Asynch_Result *asynch_result = find_completed_aio (error_status, transfer_count, index, count); if (asynch_result == 0) break; // Call the application code. this->application_specific_code (asynch_result, transfer_count, 0, // No completion key. error_status); // Error } // process post_completed results ret_que = this->process_result_queue (); // Uncomment this if you want to test // and research the behavior of you system #if 0 ACE_DEBUG ((LM_DEBUG, "(%t) NumAIO=%d NumQueue=%d\n", ret_aio, ret_que)); #endif return ret_aio + ret_que > 0 ? 1 : 0; } #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */ // ********************************************************************* ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer (const ACE_Handler::Proxy_Ptr &handler_proxy, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event, int priority, int signal_number) : ACE_POSIX_Asynch_Result (handler_proxy, act, event, 0, 0, priority, signal_number), time_ (tv) { } void ACE_POSIX_Asynch_Timer::complete (size_t /* bytes_transferred */, int /* success */, const void * /* completion_key */, u_long /* error */) { ACE_Handler *handler = this->handler_proxy_.get ()->handler (); if (handler != 0) handler->handle_time_out (this->time_, this->act ()); } // ********************************************************************* ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr &handler_proxy, const void *act, ACE_HANDLE event, int priority, int signal_number) : ACE_Asynch_Result_Impl (), ACE_POSIX_Asynch_Result (handler_proxy, act, event, 0, 0, priority, signal_number) { } ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void) { } void ACE_POSIX_Wakeup_Completion::complete (size_t /* bytes_transferred */, int /* success */, const void * /* completion_key */, u_long /* error */) { ACE_Handler *handler = this->handler_proxy_.get ()->handler (); if (handler != 0) handler->handle_wakeup (); } ACE_END_VERSIONED_NAMESPACE_DECL #endif /* ACE_HAS_AIO_CALLS */