From c33034967b23c1a49ef2d6e9906814bbcd610a77 Mon Sep 17 00:00:00 2001 From: irfan Date: Mon, 11 Aug 1997 15:21:59 +0000 Subject: *** empty log message *** --- examples/Reactor/ReactorEx/reactorex.mak | 143 ++++++++++++++++++++--- examples/Reactor/ReactorEx/reactorex.mdp | Bin 72704 -> 78848 bytes examples/Reactor/ReactorEx/test_reactorEx.cpp | 138 ++++++++++++---------- examples/Reactor/WFMO_Reactor/reactorex.mak | 143 ++++++++++++++++++++--- examples/Reactor/WFMO_Reactor/reactorex.mdp | Bin 72704 -> 78848 bytes examples/Reactor/WFMO_Reactor/test_reactorEx.cpp | 138 ++++++++++++---------- 6 files changed, 406 insertions(+), 156 deletions(-) diff --git a/examples/Reactor/ReactorEx/reactorex.mak b/examples/Reactor/ReactorEx/reactorex.mak index 85bb469600c..b64c91567d2 100644 --- a/examples/Reactor/ReactorEx/reactorex.mak +++ b/examples/Reactor/ReactorEx/reactorex.mak @@ -76,8 +76,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/ntalk.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -134,8 +132,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/test_remove_handler.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -192,8 +188,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/test_timeout.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -250,8 +244,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/test_MT.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -310,8 +302,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/test_exception.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -371,8 +361,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/simple.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -431,8 +419,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/network.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -460,6 +446,9 @@ LINK32_OBJS= \ !ENDIF +CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ + /Fp"$(INTDIR)/ntalk.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c + .c{$(CPP_OBJS)}.obj: $(CPP) $(CPP_PROJ) $< @@ -488,10 +477,12 @@ LINK32_OBJS= \ SOURCE=.\test_reactorEx.cpp DEP_CPP_TEST_=\ {$(INCLUDE)}"\ace\ACE.h"\ + {$(INCLUDE)}"\ace\ACE.i"\ {$(INCLUDE)}"\ace\Addr.h"\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -637,10 +628,12 @@ DEP_CPP_TEST_=\ SOURCE=.\test_remove_handler.cpp DEP_CPP_TEST_R=\ {$(INCLUDE)}"\ace\ACE.h"\ + {$(INCLUDE)}"\ace\ACE.i"\ {$(INCLUDE)}"\ace\Addr.h"\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -774,6 +767,7 @@ DEP_CPP_TEST_T=\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -902,10 +896,12 @@ DEP_CPP_TEST_T=\ SOURCE=.\test_MT.cpp DEP_CPP_TEST_M=\ {$(INCLUDE)}"\ace\ACE.h"\ + {$(INCLUDE)}"\ace\ACE.i"\ {$(INCLUDE)}"\ace\Addr.h"\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -1052,6 +1048,7 @@ DEP_CPP_TEST_E=\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -1179,7 +1176,124 @@ DEP_CPP_TEST_E=\ SOURCE=.\simple_test.cpp DEP_CPP_SIMPL=\ + {$(INCLUDE)}"\ace\ACE.h"\ + {$(INCLUDE)}"\ace\ACE.i"\ + {$(INCLUDE)}"\ace\Addr.h"\ + {$(INCLUDE)}"\ace\Addr.i"\ + {$(INCLUDE)}"\ace\Asynch_IO.h"\ + {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ + {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ + {$(INCLUDE)}"\ace\Auto_Ptr.h"\ + {$(INCLUDE)}"\ace\Auto_Ptr.i"\ + {$(INCLUDE)}"\ace\config-win32-common.h"\ + {$(INCLUDE)}"\ace\config-win32.h"\ + {$(INCLUDE)}"\ace\config.h"\ + {$(INCLUDE)}"\ace\Containers.cpp"\ + {$(INCLUDE)}"\ace\Containers.h"\ + {$(INCLUDE)}"\ace\Containers.i"\ + {$(INCLUDE)}"\ace\Event_Handler.h"\ + {$(INCLUDE)}"\ace\Event_Handler.i"\ + {$(INCLUDE)}"\ace\Free_List.cpp"\ + {$(INCLUDE)}"\ace\Free_List.h"\ + {$(INCLUDE)}"\ace\Free_List.i"\ + {$(INCLUDE)}"\ace\Handle_Set.h"\ + {$(INCLUDE)}"\ace\Handle_Set.i"\ + {$(INCLUDE)}"\ace\Hash_Map_Manager.cpp"\ + {$(INCLUDE)}"\ace\Hash_Map_Manager.h"\ + {$(INCLUDE)}"\ace\High_Res_Timer.h"\ + {$(INCLUDE)}"\ace\High_Res_Timer.i"\ + {$(INCLUDE)}"\ace\INET_Addr.h"\ + {$(INCLUDE)}"\ace\INET_Addr.i"\ + {$(INCLUDE)}"\ace\IO_Cntl_Msg.h"\ + {$(INCLUDE)}"\ace\IPC_SAP.h"\ + {$(INCLUDE)}"\ace\IPC_SAP.i"\ + {$(INCLUDE)}"\ace\Local_Tokens.h"\ + {$(INCLUDE)}"\ace\Local_Tokens.i"\ + {$(INCLUDE)}"\ace\Log_Msg.h"\ + {$(INCLUDE)}"\ace\Log_Priority.h"\ + {$(INCLUDE)}"\ace\Log_Record.h"\ + {$(INCLUDE)}"\ace\Log_Record.i"\ + {$(INCLUDE)}"\ace\Malloc.h"\ + {$(INCLUDE)}"\ace\Malloc.i"\ + {$(INCLUDE)}"\ace\Malloc_T.cpp"\ + {$(INCLUDE)}"\ace\Malloc_T.h"\ + {$(INCLUDE)}"\ace\Malloc_T.i"\ + {$(INCLUDE)}"\ace\Map_Manager.cpp"\ + {$(INCLUDE)}"\ace\Map_Manager.h"\ + {$(INCLUDE)}"\ace\Map_Manager.i"\ + {$(INCLUDE)}"\ace\Mem_Map.h"\ + {$(INCLUDE)}"\ace\Mem_Map.i"\ + {$(INCLUDE)}"\ace\Memory_Pool.h"\ + {$(INCLUDE)}"\ace\Memory_Pool.i"\ + {$(INCLUDE)}"\ace\Message_Block.h"\ + {$(INCLUDE)}"\ace\Message_Block.i"\ + {$(INCLUDE)}"\ace\Message_Queue.cpp"\ + {$(INCLUDE)}"\ace\Message_Queue.h"\ + {$(INCLUDE)}"\ace\Message_Queue.i"\ + {$(INCLUDE)}"\ace\OS.h"\ + {$(INCLUDE)}"\ace\OS.i"\ + {$(INCLUDE)}"\ace\Pipe.h"\ + {$(INCLUDE)}"\ace\Pipe.i"\ + {$(INCLUDE)}"\ace\Proactor.h"\ + {$(INCLUDE)}"\ace\Proactor.i"\ + {$(INCLUDE)}"\ace\Reactor.h"\ + {$(INCLUDE)}"\ace\Reactor.i"\ {$(INCLUDE)}"\ace\ReactorEx.h"\ + {$(INCLUDE)}"\ace\ReactorEx.i"\ + {$(INCLUDE)}"\ace\Service_Config.h"\ + {$(INCLUDE)}"\ace\Service_Config.i"\ + {$(INCLUDE)}"\ace\Service_Object.h"\ + {$(INCLUDE)}"\ace\Service_Object.i"\ + {$(INCLUDE)}"\ace\Shared_Object.h"\ + {$(INCLUDE)}"\ace\Shared_Object.i"\ + {$(INCLUDE)}"\ace\Signal.h"\ + {$(INCLUDE)}"\ace\Signal.i"\ + {$(INCLUDE)}"\ace\SOCK.h"\ + {$(INCLUDE)}"\ace\SOCK.i"\ + {$(INCLUDE)}"\ace\SOCK_IO.h"\ + {$(INCLUDE)}"\ace\SOCK_IO.i"\ + {$(INCLUDE)}"\ace\SOCK_Stream.h"\ + {$(INCLUDE)}"\ace\SOCK_Stream.i"\ + {$(INCLUDE)}"\ace\SString.h"\ + {$(INCLUDE)}"\ace\SString.i"\ + {$(INCLUDE)}"\ace\stdcpp.h"\ + {$(INCLUDE)}"\ace\Strategies.h"\ + {$(INCLUDE)}"\ace\Strategies_T.cpp"\ + {$(INCLUDE)}"\ace\Strategies_T.h"\ + {$(INCLUDE)}"\ace\SV_Semaphore_Complex.h"\ + {$(INCLUDE)}"\ace\SV_Semaphore_Complex.i"\ + {$(INCLUDE)}"\ace\SV_Semaphore_Simple.h"\ + {$(INCLUDE)}"\ace\SV_Semaphore_Simple.i"\ + {$(INCLUDE)}"\ace\Svc_Conf_Tokens.h"\ + {$(INCLUDE)}"\ace\Synch.h"\ + {$(INCLUDE)}"\ace\Synch.i"\ + {$(INCLUDE)}"\ace\Synch_Options.h"\ + {$(INCLUDE)}"\ace\Synch_T.cpp"\ + {$(INCLUDE)}"\ace\Synch_T.h"\ + {$(INCLUDE)}"\ace\Synch_T.i"\ + {$(INCLUDE)}"\ace\Thread.h"\ + {$(INCLUDE)}"\ace\Thread.i"\ + {$(INCLUDE)}"\ace\Thread_Manager.h"\ + {$(INCLUDE)}"\ace\Thread_Manager.i"\ + {$(INCLUDE)}"\ace\Time_Value.h"\ + {$(INCLUDE)}"\ace\Timer_Heap.h"\ + {$(INCLUDE)}"\ace\Timer_Heap_T.cpp"\ + {$(INCLUDE)}"\ace\Timer_Heap_T.h"\ + {$(INCLUDE)}"\ace\Timer_List.h"\ + {$(INCLUDE)}"\ace\Timer_List_T.cpp"\ + {$(INCLUDE)}"\ace\Timer_List_T.h"\ + {$(INCLUDE)}"\ace\Timer_Queue.h"\ + {$(INCLUDE)}"\ace\Timer_Queue_T.cpp"\ + {$(INCLUDE)}"\ace\Timer_Queue_T.h"\ + {$(INCLUDE)}"\ace\Timer_Queue_T.i"\ + {$(INCLUDE)}"\ace\Timer_Wheel.h"\ + {$(INCLUDE)}"\ace\Timer_Wheel_T.cpp"\ + {$(INCLUDE)}"\ace\Timer_Wheel_T.h"\ + {$(INCLUDE)}"\ace\Token.h"\ + {$(INCLUDE)}"\ace\Token.i"\ + {$(INCLUDE)}"\ace\Trace.h"\ + {$(INCLUDE)}"\ace\ws2tcpip.h"\ "$(INTDIR)\simple_test.obj" : $(SOURCE) $(DEP_CPP_SIMPL) "$(INTDIR)" @@ -1202,6 +1316,7 @@ DEP_CPP_NETWO=\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ diff --git a/examples/Reactor/ReactorEx/reactorex.mdp b/examples/Reactor/ReactorEx/reactorex.mdp index 1e2bde0a14b..8abb3eb3924 100644 Binary files a/examples/Reactor/ReactorEx/reactorex.mdp and b/examples/Reactor/ReactorEx/reactorex.mdp differ diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp index 0ca4563ea25..5075437be94 100644 --- a/examples/Reactor/ReactorEx/test_reactorEx.cpp +++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp @@ -36,7 +36,7 @@ typedef ACE_Task MT_TASK; -class Peer_Handler : public MT_TASK +class Peer_Handler : public MT_TASK, public ACE_Handler // = TITLE // Connect to a server. Receive messages from STDIN_Handler // and forward them to the server using proactive I/O. @@ -51,22 +51,21 @@ public: // It does blocking connects and accepts depending on whether a // hostname was specified from the command line. - virtual int handle_output_complete (ACE_Message_Block *msg, - long bytes_transferred); - // One of our asynchronous writes to the remote peer has completed. - // Make sure it succeeded and then delete the message. - - virtual int handle_input_complete (ACE_Message_Block *msg, - long bytes_transferred); + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // This method will be called when an asynchronous read completes on a stream. // The remote peer has sent us something. If it succeeded, print // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. - virtual ACE_Message_Block *get_message (void); - // This is so the Proactor can get a message to read into. + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This method will be called when an asynchronous write completes on a strea_m. + // One of our asynchronous writes to the remote peer has completed. + // Make sure it succeeded and then delete the message. - virtual ACE_HANDLE get_handle (void) const; - // This is so the Proactor can get our handle. + virtual ACE_HANDLE handle (void) const; + // Get the I/O handle used by this . This method will be + // called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is + // passed to . virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); // We've been removed from the ReactorEx. @@ -90,6 +89,15 @@ private: u_short port_; // Port number for remote host. + + ACE_Asynch_Read_Stream rd_stream_; + // Read stream + + ACE_Asynch_Write_Stream wr_stream_; + // Write stream + + ACE_Message_Block mb_; + // Message Block for reading from the network }; class STDIN_Handler : public ACE_Task @@ -134,7 +142,8 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) port_ (ACE_DEFAULT_SERVER_PORT), strategy_ (ACE_ReactorEx::instance (), this, - ACE_Event_Handler::WRITE_MASK) + ACE_Event_Handler::WRITE_MASK), + mb_ (BUFSIZ) { // This code sets up the message to notify us when a new message is // added to the queue. Actually, the queue notifies ReactorEx which @@ -178,7 +187,7 @@ Peer_Handler::open (void *) if (connector.connect (stream_, addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); - ACE_DEBUG ((LM_DEBUG, "connected.\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n")); } else // Acceptor { @@ -189,45 +198,51 @@ Peer_Handler::open (void *) (acceptor.accept (this->stream_) == -1)) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); - ACE_DEBUG ((LM_DEBUG, "accepted.\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n")); } - return ACE_Proactor::instance ()->initiate - (this, ACE_Event_Handler::READ_MASK); + int result = this->rd_stream_.open (*this); + if (result != 0) + return result; + + result = this->wr_stream_.open (*this); + if (result != 0) + return result; + + result = this->rd_stream_.read (this->mb_, + this->mb_.size ()); + return result; } // One of our asynchronous writes to the remote peer has completed. // Make sure it succeeded and then delete the message. -int -Peer_Handler::handle_output_complete (ACE_Message_Block *msg, - long bytes_transferred) +void +Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { - if (bytes_transferred <= 0) - ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed", - bytes_transferred)); - - // This was allocated by the STDIN_Handler, queued, dequeued, - // passed to the proactor, and now passed back to us. - msg->release (); - return 0; // Do not reinvoke a send. + if (result.bytes_transferred () <= 0) + ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed", + result.bytes_transferred ())); + + // This was allocated by the STDIN_Handler, queued, dequeued, passed + // to the proactor, and now passed back to us. + result.message_block ().release (); } // The remote peer has sent us something. If it succeeded, print // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. -int -Peer_Handler::handle_input_complete (ACE_Message_Block *msg, - long bytes_transferred) + +void +Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { - int result = 1; // Reinvokes the recv() operation by default! - - if (bytes_transferred > 0 && msg->length () > 0) + if (result.bytes_transferred () > 0 && + this->mb_.length () > 0) { - msg->rd_ptr ()[bytes_transferred] = '\0'; + this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0'; // Print out the message received from the server. - ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ())); } else { @@ -235,29 +250,21 @@ Peer_Handler::handle_input_complete (ACE_Message_Block *msg, // went away. We will end the event loop. Since we're in the // main thread, we don't need to do a notify. ACE_ReactorEx::end_event_loop(); - result = -1; + return; } - msg->release (); - return result; -} - -// This is so the Proactor can get a message to read into. - -ACE_Message_Block * -Peer_Handler::get_message (void) -{ - // An extra byte for NUL termination. - ACE_Message_Block *message = - new ACE_Message_Block (BUFSIZ + 1); + // Reset pointers + this->mb_.wr_ptr (-result.bytes_transferred ()); - message->size (BUFSIZ); - return message; + // Start off another read + if (this->rd_stream_.read (this->mb_, + this->mb_.size ()) == -1) + ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler")); } // This is so the Proactor can get our handle. ACE_HANDLE -Peer_Handler::get_handle (void) const +Peer_Handler::handle (void) const { return this->stream_.get_handle (); } @@ -266,7 +273,7 @@ Peer_Handler::get_handle (void) const int Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { - ACE_DEBUG ((LM_DEBUG, "Peer_Handler closing down\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n")); return 0; } @@ -281,9 +288,9 @@ Peer_Handler::handle_output (ACE_HANDLE fd) // Forward the message to the remote peer receiver. if (this->getq (mb, &tv) != -1) { - if (ACE_Proactor::instance ()-> - initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1) - ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler")); + if (this->wr_stream_.write (*mb, + mb->length ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1); } return 0; } @@ -291,7 +298,7 @@ Peer_Handler::handle_output (ACE_HANDLE fd) void STDIN_Handler::handler (int signum) { - ACE_DEBUG ((LM_DEBUG, "signal = %S\n", signum)); + ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum)); } STDIN_Handler::STDIN_Handler (MT_TASK &ph) @@ -355,7 +362,10 @@ STDIN_Handler::svc (void) this->ph_.putq (mb); } else - break; + { + mb->release (); + break; + } } // handle_signal will get called on the main proactor thread since @@ -369,17 +379,13 @@ void STDIN_Handler::register_thread_exit_hook (void) { // Get a real handle to our thread. - ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_); + ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_); // Register ourselves to get called back when our thread exits. if (ACE_ReactorEx::instance ()-> register_handler (this, this->thr_handle_) == -1) ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n")); - - // We're in another thread, so we need to notify the ReactorEx so - // that it wakes up and waits on the new set of handles. - ACE_ReactorEx::instance ()->notify (); } // The STDIN thread has exited. This means the user hit ^C. We can @@ -388,7 +394,7 @@ STDIN_Handler::register_thread_exit_hook (void) int STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) { - ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) STDIN thread has exited.\n")); ACE_ASSERT (this->thr_handle_ == si->si_handle_); ACE_ReactorEx::end_event_loop(); return 0; @@ -397,6 +403,10 @@ STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) int main (int argc, char *argv[]) { + // Let the proactor know that it will be used with ReactorEx + ACE_Proactor proactor (0, 0, 1); + ACE_Proactor::instance (&proactor); + // Open handler for remote peer communications this will run from // the main thread. Peer_Handler peer_handler (argc, argv); diff --git a/examples/Reactor/WFMO_Reactor/reactorex.mak b/examples/Reactor/WFMO_Reactor/reactorex.mak index 85bb469600c..b64c91567d2 100644 --- a/examples/Reactor/WFMO_Reactor/reactorex.mak +++ b/examples/Reactor/WFMO_Reactor/reactorex.mak @@ -76,8 +76,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/ntalk.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -134,8 +132,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/test_remove_handler.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -192,8 +188,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/test_timeout.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -250,8 +244,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/test_MT.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -310,8 +302,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/test_exception.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -371,8 +361,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/simple.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -431,8 +419,6 @@ CLEAN : # ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c # ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c -CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ - /Fp"$(INTDIR)/network.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c CPP_OBJS=.\Debug/ CPP_SBRS=.\. # ADD BASE RSC /l 0x409 /d "_DEBUG" @@ -460,6 +446,9 @@ LINK32_OBJS= \ !ENDIF +CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\ + /Fp"$(INTDIR)/ntalk.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c + .c{$(CPP_OBJS)}.obj: $(CPP) $(CPP_PROJ) $< @@ -488,10 +477,12 @@ LINK32_OBJS= \ SOURCE=.\test_reactorEx.cpp DEP_CPP_TEST_=\ {$(INCLUDE)}"\ace\ACE.h"\ + {$(INCLUDE)}"\ace\ACE.i"\ {$(INCLUDE)}"\ace\Addr.h"\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -637,10 +628,12 @@ DEP_CPP_TEST_=\ SOURCE=.\test_remove_handler.cpp DEP_CPP_TEST_R=\ {$(INCLUDE)}"\ace\ACE.h"\ + {$(INCLUDE)}"\ace\ACE.i"\ {$(INCLUDE)}"\ace\Addr.h"\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -774,6 +767,7 @@ DEP_CPP_TEST_T=\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -902,10 +896,12 @@ DEP_CPP_TEST_T=\ SOURCE=.\test_MT.cpp DEP_CPP_TEST_M=\ {$(INCLUDE)}"\ace\ACE.h"\ + {$(INCLUDE)}"\ace\ACE.i"\ {$(INCLUDE)}"\ace\Addr.h"\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -1052,6 +1048,7 @@ DEP_CPP_TEST_E=\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ @@ -1179,7 +1176,124 @@ DEP_CPP_TEST_E=\ SOURCE=.\simple_test.cpp DEP_CPP_SIMPL=\ + {$(INCLUDE)}"\ace\ACE.h"\ + {$(INCLUDE)}"\ace\ACE.i"\ + {$(INCLUDE)}"\ace\Addr.h"\ + {$(INCLUDE)}"\ace\Addr.i"\ + {$(INCLUDE)}"\ace\Asynch_IO.h"\ + {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ + {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ + {$(INCLUDE)}"\ace\Auto_Ptr.h"\ + {$(INCLUDE)}"\ace\Auto_Ptr.i"\ + {$(INCLUDE)}"\ace\config-win32-common.h"\ + {$(INCLUDE)}"\ace\config-win32.h"\ + {$(INCLUDE)}"\ace\config.h"\ + {$(INCLUDE)}"\ace\Containers.cpp"\ + {$(INCLUDE)}"\ace\Containers.h"\ + {$(INCLUDE)}"\ace\Containers.i"\ + {$(INCLUDE)}"\ace\Event_Handler.h"\ + {$(INCLUDE)}"\ace\Event_Handler.i"\ + {$(INCLUDE)}"\ace\Free_List.cpp"\ + {$(INCLUDE)}"\ace\Free_List.h"\ + {$(INCLUDE)}"\ace\Free_List.i"\ + {$(INCLUDE)}"\ace\Handle_Set.h"\ + {$(INCLUDE)}"\ace\Handle_Set.i"\ + {$(INCLUDE)}"\ace\Hash_Map_Manager.cpp"\ + {$(INCLUDE)}"\ace\Hash_Map_Manager.h"\ + {$(INCLUDE)}"\ace\High_Res_Timer.h"\ + {$(INCLUDE)}"\ace\High_Res_Timer.i"\ + {$(INCLUDE)}"\ace\INET_Addr.h"\ + {$(INCLUDE)}"\ace\INET_Addr.i"\ + {$(INCLUDE)}"\ace\IO_Cntl_Msg.h"\ + {$(INCLUDE)}"\ace\IPC_SAP.h"\ + {$(INCLUDE)}"\ace\IPC_SAP.i"\ + {$(INCLUDE)}"\ace\Local_Tokens.h"\ + {$(INCLUDE)}"\ace\Local_Tokens.i"\ + {$(INCLUDE)}"\ace\Log_Msg.h"\ + {$(INCLUDE)}"\ace\Log_Priority.h"\ + {$(INCLUDE)}"\ace\Log_Record.h"\ + {$(INCLUDE)}"\ace\Log_Record.i"\ + {$(INCLUDE)}"\ace\Malloc.h"\ + {$(INCLUDE)}"\ace\Malloc.i"\ + {$(INCLUDE)}"\ace\Malloc_T.cpp"\ + {$(INCLUDE)}"\ace\Malloc_T.h"\ + {$(INCLUDE)}"\ace\Malloc_T.i"\ + {$(INCLUDE)}"\ace\Map_Manager.cpp"\ + {$(INCLUDE)}"\ace\Map_Manager.h"\ + {$(INCLUDE)}"\ace\Map_Manager.i"\ + {$(INCLUDE)}"\ace\Mem_Map.h"\ + {$(INCLUDE)}"\ace\Mem_Map.i"\ + {$(INCLUDE)}"\ace\Memory_Pool.h"\ + {$(INCLUDE)}"\ace\Memory_Pool.i"\ + {$(INCLUDE)}"\ace\Message_Block.h"\ + {$(INCLUDE)}"\ace\Message_Block.i"\ + {$(INCLUDE)}"\ace\Message_Queue.cpp"\ + {$(INCLUDE)}"\ace\Message_Queue.h"\ + {$(INCLUDE)}"\ace\Message_Queue.i"\ + {$(INCLUDE)}"\ace\OS.h"\ + {$(INCLUDE)}"\ace\OS.i"\ + {$(INCLUDE)}"\ace\Pipe.h"\ + {$(INCLUDE)}"\ace\Pipe.i"\ + {$(INCLUDE)}"\ace\Proactor.h"\ + {$(INCLUDE)}"\ace\Proactor.i"\ + {$(INCLUDE)}"\ace\Reactor.h"\ + {$(INCLUDE)}"\ace\Reactor.i"\ {$(INCLUDE)}"\ace\ReactorEx.h"\ + {$(INCLUDE)}"\ace\ReactorEx.i"\ + {$(INCLUDE)}"\ace\Service_Config.h"\ + {$(INCLUDE)}"\ace\Service_Config.i"\ + {$(INCLUDE)}"\ace\Service_Object.h"\ + {$(INCLUDE)}"\ace\Service_Object.i"\ + {$(INCLUDE)}"\ace\Shared_Object.h"\ + {$(INCLUDE)}"\ace\Shared_Object.i"\ + {$(INCLUDE)}"\ace\Signal.h"\ + {$(INCLUDE)}"\ace\Signal.i"\ + {$(INCLUDE)}"\ace\SOCK.h"\ + {$(INCLUDE)}"\ace\SOCK.i"\ + {$(INCLUDE)}"\ace\SOCK_IO.h"\ + {$(INCLUDE)}"\ace\SOCK_IO.i"\ + {$(INCLUDE)}"\ace\SOCK_Stream.h"\ + {$(INCLUDE)}"\ace\SOCK_Stream.i"\ + {$(INCLUDE)}"\ace\SString.h"\ + {$(INCLUDE)}"\ace\SString.i"\ + {$(INCLUDE)}"\ace\stdcpp.h"\ + {$(INCLUDE)}"\ace\Strategies.h"\ + {$(INCLUDE)}"\ace\Strategies_T.cpp"\ + {$(INCLUDE)}"\ace\Strategies_T.h"\ + {$(INCLUDE)}"\ace\SV_Semaphore_Complex.h"\ + {$(INCLUDE)}"\ace\SV_Semaphore_Complex.i"\ + {$(INCLUDE)}"\ace\SV_Semaphore_Simple.h"\ + {$(INCLUDE)}"\ace\SV_Semaphore_Simple.i"\ + {$(INCLUDE)}"\ace\Svc_Conf_Tokens.h"\ + {$(INCLUDE)}"\ace\Synch.h"\ + {$(INCLUDE)}"\ace\Synch.i"\ + {$(INCLUDE)}"\ace\Synch_Options.h"\ + {$(INCLUDE)}"\ace\Synch_T.cpp"\ + {$(INCLUDE)}"\ace\Synch_T.h"\ + {$(INCLUDE)}"\ace\Synch_T.i"\ + {$(INCLUDE)}"\ace\Thread.h"\ + {$(INCLUDE)}"\ace\Thread.i"\ + {$(INCLUDE)}"\ace\Thread_Manager.h"\ + {$(INCLUDE)}"\ace\Thread_Manager.i"\ + {$(INCLUDE)}"\ace\Time_Value.h"\ + {$(INCLUDE)}"\ace\Timer_Heap.h"\ + {$(INCLUDE)}"\ace\Timer_Heap_T.cpp"\ + {$(INCLUDE)}"\ace\Timer_Heap_T.h"\ + {$(INCLUDE)}"\ace\Timer_List.h"\ + {$(INCLUDE)}"\ace\Timer_List_T.cpp"\ + {$(INCLUDE)}"\ace\Timer_List_T.h"\ + {$(INCLUDE)}"\ace\Timer_Queue.h"\ + {$(INCLUDE)}"\ace\Timer_Queue_T.cpp"\ + {$(INCLUDE)}"\ace\Timer_Queue_T.h"\ + {$(INCLUDE)}"\ace\Timer_Queue_T.i"\ + {$(INCLUDE)}"\ace\Timer_Wheel.h"\ + {$(INCLUDE)}"\ace\Timer_Wheel_T.cpp"\ + {$(INCLUDE)}"\ace\Timer_Wheel_T.h"\ + {$(INCLUDE)}"\ace\Token.h"\ + {$(INCLUDE)}"\ace\Token.i"\ + {$(INCLUDE)}"\ace\Trace.h"\ + {$(INCLUDE)}"\ace\ws2tcpip.h"\ "$(INTDIR)\simple_test.obj" : $(SOURCE) $(DEP_CPP_SIMPL) "$(INTDIR)" @@ -1202,6 +1316,7 @@ DEP_CPP_NETWO=\ {$(INCLUDE)}"\ace\Addr.i"\ {$(INCLUDE)}"\ace\Asynch_IO.h"\ {$(INCLUDE)}"\ace\Asynch_IO.i"\ + {$(INCLUDE)}"\ace\Atomic_Op.i"\ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\ {$(INCLUDE)}"\ace\Auto_Ptr.h"\ {$(INCLUDE)}"\ace\Auto_Ptr.i"\ diff --git a/examples/Reactor/WFMO_Reactor/reactorex.mdp b/examples/Reactor/WFMO_Reactor/reactorex.mdp index 1e2bde0a14b..8abb3eb3924 100644 Binary files a/examples/Reactor/WFMO_Reactor/reactorex.mdp and b/examples/Reactor/WFMO_Reactor/reactorex.mdp differ diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp index 0ca4563ea25..5075437be94 100644 --- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp +++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp @@ -36,7 +36,7 @@ typedef ACE_Task MT_TASK; -class Peer_Handler : public MT_TASK +class Peer_Handler : public MT_TASK, public ACE_Handler // = TITLE // Connect to a server. Receive messages from STDIN_Handler // and forward them to the server using proactive I/O. @@ -51,22 +51,21 @@ public: // It does blocking connects and accepts depending on whether a // hostname was specified from the command line. - virtual int handle_output_complete (ACE_Message_Block *msg, - long bytes_transferred); - // One of our asynchronous writes to the remote peer has completed. - // Make sure it succeeded and then delete the message. - - virtual int handle_input_complete (ACE_Message_Block *msg, - long bytes_transferred); + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // This method will be called when an asynchronous read completes on a stream. // The remote peer has sent us something. If it succeeded, print // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. - virtual ACE_Message_Block *get_message (void); - // This is so the Proactor can get a message to read into. + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This method will be called when an asynchronous write completes on a strea_m. + // One of our asynchronous writes to the remote peer has completed. + // Make sure it succeeded and then delete the message. - virtual ACE_HANDLE get_handle (void) const; - // This is so the Proactor can get our handle. + virtual ACE_HANDLE handle (void) const; + // Get the I/O handle used by this . This method will be + // called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is + // passed to . virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); // We've been removed from the ReactorEx. @@ -90,6 +89,15 @@ private: u_short port_; // Port number for remote host. + + ACE_Asynch_Read_Stream rd_stream_; + // Read stream + + ACE_Asynch_Write_Stream wr_stream_; + // Write stream + + ACE_Message_Block mb_; + // Message Block for reading from the network }; class STDIN_Handler : public ACE_Task @@ -134,7 +142,8 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) port_ (ACE_DEFAULT_SERVER_PORT), strategy_ (ACE_ReactorEx::instance (), this, - ACE_Event_Handler::WRITE_MASK) + ACE_Event_Handler::WRITE_MASK), + mb_ (BUFSIZ) { // This code sets up the message to notify us when a new message is // added to the queue. Actually, the queue notifies ReactorEx which @@ -178,7 +187,7 @@ Peer_Handler::open (void *) if (connector.connect (stream_, addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); - ACE_DEBUG ((LM_DEBUG, "connected.\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n")); } else // Acceptor { @@ -189,45 +198,51 @@ Peer_Handler::open (void *) (acceptor.accept (this->stream_) == -1)) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); - ACE_DEBUG ((LM_DEBUG, "accepted.\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n")); } - return ACE_Proactor::instance ()->initiate - (this, ACE_Event_Handler::READ_MASK); + int result = this->rd_stream_.open (*this); + if (result != 0) + return result; + + result = this->wr_stream_.open (*this); + if (result != 0) + return result; + + result = this->rd_stream_.read (this->mb_, + this->mb_.size ()); + return result; } // One of our asynchronous writes to the remote peer has completed. // Make sure it succeeded and then delete the message. -int -Peer_Handler::handle_output_complete (ACE_Message_Block *msg, - long bytes_transferred) +void +Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { - if (bytes_transferred <= 0) - ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed", - bytes_transferred)); - - // This was allocated by the STDIN_Handler, queued, dequeued, - // passed to the proactor, and now passed back to us. - msg->release (); - return 0; // Do not reinvoke a send. + if (result.bytes_transferred () <= 0) + ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed", + result.bytes_transferred ())); + + // This was allocated by the STDIN_Handler, queued, dequeued, passed + // to the proactor, and now passed back to us. + result.message_block ().release (); } // The remote peer has sent us something. If it succeeded, print // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. -int -Peer_Handler::handle_input_complete (ACE_Message_Block *msg, - long bytes_transferred) + +void +Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { - int result = 1; // Reinvokes the recv() operation by default! - - if (bytes_transferred > 0 && msg->length () > 0) + if (result.bytes_transferred () > 0 && + this->mb_.length () > 0) { - msg->rd_ptr ()[bytes_transferred] = '\0'; + this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0'; // Print out the message received from the server. - ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ())); } else { @@ -235,29 +250,21 @@ Peer_Handler::handle_input_complete (ACE_Message_Block *msg, // went away. We will end the event loop. Since we're in the // main thread, we don't need to do a notify. ACE_ReactorEx::end_event_loop(); - result = -1; + return; } - msg->release (); - return result; -} - -// This is so the Proactor can get a message to read into. - -ACE_Message_Block * -Peer_Handler::get_message (void) -{ - // An extra byte for NUL termination. - ACE_Message_Block *message = - new ACE_Message_Block (BUFSIZ + 1); + // Reset pointers + this->mb_.wr_ptr (-result.bytes_transferred ()); - message->size (BUFSIZ); - return message; + // Start off another read + if (this->rd_stream_.read (this->mb_, + this->mb_.size ()) == -1) + ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler")); } // This is so the Proactor can get our handle. ACE_HANDLE -Peer_Handler::get_handle (void) const +Peer_Handler::handle (void) const { return this->stream_.get_handle (); } @@ -266,7 +273,7 @@ Peer_Handler::get_handle (void) const int Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { - ACE_DEBUG ((LM_DEBUG, "Peer_Handler closing down\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n")); return 0; } @@ -281,9 +288,9 @@ Peer_Handler::handle_output (ACE_HANDLE fd) // Forward the message to the remote peer receiver. if (this->getq (mb, &tv) != -1) { - if (ACE_Proactor::instance ()-> - initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1) - ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler")); + if (this->wr_stream_.write (*mb, + mb->length ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1); } return 0; } @@ -291,7 +298,7 @@ Peer_Handler::handle_output (ACE_HANDLE fd) void STDIN_Handler::handler (int signum) { - ACE_DEBUG ((LM_DEBUG, "signal = %S\n", signum)); + ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum)); } STDIN_Handler::STDIN_Handler (MT_TASK &ph) @@ -355,7 +362,10 @@ STDIN_Handler::svc (void) this->ph_.putq (mb); } else - break; + { + mb->release (); + break; + } } // handle_signal will get called on the main proactor thread since @@ -369,17 +379,13 @@ void STDIN_Handler::register_thread_exit_hook (void) { // Get a real handle to our thread. - ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_); + ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_); // Register ourselves to get called back when our thread exits. if (ACE_ReactorEx::instance ()-> register_handler (this, this->thr_handle_) == -1) ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n")); - - // We're in another thread, so we need to notify the ReactorEx so - // that it wakes up and waits on the new set of handles. - ACE_ReactorEx::instance ()->notify (); } // The STDIN thread has exited. This means the user hit ^C. We can @@ -388,7 +394,7 @@ STDIN_Handler::register_thread_exit_hook (void) int STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) { - ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) STDIN thread has exited.\n")); ACE_ASSERT (this->thr_handle_ == si->si_handle_); ACE_ReactorEx::end_event_loop(); return 0; @@ -397,6 +403,10 @@ STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) int main (int argc, char *argv[]) { + // Let the proactor know that it will be used with ReactorEx + ACE_Proactor proactor (0, 0, 1); + ACE_Proactor::instance (&proactor); + // Open handler for remote peer communications this will run from // the main thread. Peer_Handler peer_handler (argc, argv); -- cgit v1.2.1