summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/Reactor/ReactorEx/reactorex.mak143
-rw-r--r--examples/Reactor/ReactorEx/reactorex.mdpbin72704 -> 78848 bytes
-rw-r--r--examples/Reactor/ReactorEx/test_reactorEx.cpp138
-rw-r--r--examples/Reactor/WFMO_Reactor/reactorex.mak143
-rw-r--r--examples/Reactor/WFMO_Reactor/reactorex.mdpbin72704 -> 78848 bytes
-rw-r--r--examples/Reactor/WFMO_Reactor/test_reactorEx.cpp138
6 files changed, 406 insertions, 156 deletions
diff --git a/examples/Reactor/ReactorEx/reactorex.mak b/examples/Reactor/ReactorEx/reactorex.mak
index 85bb469600c..b64c91567d2 100644
--- a/examples/Reactor/ReactorEx/reactorex.mak
+++ b/examples/Reactor/ReactorEx/reactorex.mak
@@ -76,8 +76,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/ntalk.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -134,8 +132,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/test_remove_handler.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -192,8 +188,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/test_timeout.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -250,8 +244,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/test_MT.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -310,8 +302,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/test_exception.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -371,8 +361,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/simple.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -431,8 +419,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/network.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -460,6 +446,9 @@ LINK32_OBJS= \
!ENDIF
+CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
+ /Fp"$(INTDIR)/ntalk.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
+
.c{$(CPP_OBJS)}.obj:
$(CPP) $(CPP_PROJ) $<
@@ -488,10 +477,12 @@ LINK32_OBJS= \
SOURCE=.\test_reactorEx.cpp
DEP_CPP_TEST_=\
{$(INCLUDE)}"\ace\ACE.h"\
+ {$(INCLUDE)}"\ace\ACE.i"\
{$(INCLUDE)}"\ace\Addr.h"\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -637,10 +628,12 @@ DEP_CPP_TEST_=\
SOURCE=.\test_remove_handler.cpp
DEP_CPP_TEST_R=\
{$(INCLUDE)}"\ace\ACE.h"\
+ {$(INCLUDE)}"\ace\ACE.i"\
{$(INCLUDE)}"\ace\Addr.h"\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -774,6 +767,7 @@ DEP_CPP_TEST_T=\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -902,10 +896,12 @@ DEP_CPP_TEST_T=\
SOURCE=.\test_MT.cpp
DEP_CPP_TEST_M=\
{$(INCLUDE)}"\ace\ACE.h"\
+ {$(INCLUDE)}"\ace\ACE.i"\
{$(INCLUDE)}"\ace\Addr.h"\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -1052,6 +1048,7 @@ DEP_CPP_TEST_E=\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -1179,7 +1176,124 @@ DEP_CPP_TEST_E=\
SOURCE=.\simple_test.cpp
DEP_CPP_SIMPL=\
+ {$(INCLUDE)}"\ace\ACE.h"\
+ {$(INCLUDE)}"\ace\ACE.i"\
+ {$(INCLUDE)}"\ace\Addr.h"\
+ {$(INCLUDE)}"\ace\Addr.i"\
+ {$(INCLUDE)}"\ace\Asynch_IO.h"\
+ {$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
+ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
+ {$(INCLUDE)}"\ace\Auto_Ptr.h"\
+ {$(INCLUDE)}"\ace\Auto_Ptr.i"\
+ {$(INCLUDE)}"\ace\config-win32-common.h"\
+ {$(INCLUDE)}"\ace\config-win32.h"\
+ {$(INCLUDE)}"\ace\config.h"\
+ {$(INCLUDE)}"\ace\Containers.cpp"\
+ {$(INCLUDE)}"\ace\Containers.h"\
+ {$(INCLUDE)}"\ace\Containers.i"\
+ {$(INCLUDE)}"\ace\Event_Handler.h"\
+ {$(INCLUDE)}"\ace\Event_Handler.i"\
+ {$(INCLUDE)}"\ace\Free_List.cpp"\
+ {$(INCLUDE)}"\ace\Free_List.h"\
+ {$(INCLUDE)}"\ace\Free_List.i"\
+ {$(INCLUDE)}"\ace\Handle_Set.h"\
+ {$(INCLUDE)}"\ace\Handle_Set.i"\
+ {$(INCLUDE)}"\ace\Hash_Map_Manager.cpp"\
+ {$(INCLUDE)}"\ace\Hash_Map_Manager.h"\
+ {$(INCLUDE)}"\ace\High_Res_Timer.h"\
+ {$(INCLUDE)}"\ace\High_Res_Timer.i"\
+ {$(INCLUDE)}"\ace\INET_Addr.h"\
+ {$(INCLUDE)}"\ace\INET_Addr.i"\
+ {$(INCLUDE)}"\ace\IO_Cntl_Msg.h"\
+ {$(INCLUDE)}"\ace\IPC_SAP.h"\
+ {$(INCLUDE)}"\ace\IPC_SAP.i"\
+ {$(INCLUDE)}"\ace\Local_Tokens.h"\
+ {$(INCLUDE)}"\ace\Local_Tokens.i"\
+ {$(INCLUDE)}"\ace\Log_Msg.h"\
+ {$(INCLUDE)}"\ace\Log_Priority.h"\
+ {$(INCLUDE)}"\ace\Log_Record.h"\
+ {$(INCLUDE)}"\ace\Log_Record.i"\
+ {$(INCLUDE)}"\ace\Malloc.h"\
+ {$(INCLUDE)}"\ace\Malloc.i"\
+ {$(INCLUDE)}"\ace\Malloc_T.cpp"\
+ {$(INCLUDE)}"\ace\Malloc_T.h"\
+ {$(INCLUDE)}"\ace\Malloc_T.i"\
+ {$(INCLUDE)}"\ace\Map_Manager.cpp"\
+ {$(INCLUDE)}"\ace\Map_Manager.h"\
+ {$(INCLUDE)}"\ace\Map_Manager.i"\
+ {$(INCLUDE)}"\ace\Mem_Map.h"\
+ {$(INCLUDE)}"\ace\Mem_Map.i"\
+ {$(INCLUDE)}"\ace\Memory_Pool.h"\
+ {$(INCLUDE)}"\ace\Memory_Pool.i"\
+ {$(INCLUDE)}"\ace\Message_Block.h"\
+ {$(INCLUDE)}"\ace\Message_Block.i"\
+ {$(INCLUDE)}"\ace\Message_Queue.cpp"\
+ {$(INCLUDE)}"\ace\Message_Queue.h"\
+ {$(INCLUDE)}"\ace\Message_Queue.i"\
+ {$(INCLUDE)}"\ace\OS.h"\
+ {$(INCLUDE)}"\ace\OS.i"\
+ {$(INCLUDE)}"\ace\Pipe.h"\
+ {$(INCLUDE)}"\ace\Pipe.i"\
+ {$(INCLUDE)}"\ace\Proactor.h"\
+ {$(INCLUDE)}"\ace\Proactor.i"\
+ {$(INCLUDE)}"\ace\Reactor.h"\
+ {$(INCLUDE)}"\ace\Reactor.i"\
{$(INCLUDE)}"\ace\ReactorEx.h"\
+ {$(INCLUDE)}"\ace\ReactorEx.i"\
+ {$(INCLUDE)}"\ace\Service_Config.h"\
+ {$(INCLUDE)}"\ace\Service_Config.i"\
+ {$(INCLUDE)}"\ace\Service_Object.h"\
+ {$(INCLUDE)}"\ace\Service_Object.i"\
+ {$(INCLUDE)}"\ace\Shared_Object.h"\
+ {$(INCLUDE)}"\ace\Shared_Object.i"\
+ {$(INCLUDE)}"\ace\Signal.h"\
+ {$(INCLUDE)}"\ace\Signal.i"\
+ {$(INCLUDE)}"\ace\SOCK.h"\
+ {$(INCLUDE)}"\ace\SOCK.i"\
+ {$(INCLUDE)}"\ace\SOCK_IO.h"\
+ {$(INCLUDE)}"\ace\SOCK_IO.i"\
+ {$(INCLUDE)}"\ace\SOCK_Stream.h"\
+ {$(INCLUDE)}"\ace\SOCK_Stream.i"\
+ {$(INCLUDE)}"\ace\SString.h"\
+ {$(INCLUDE)}"\ace\SString.i"\
+ {$(INCLUDE)}"\ace\stdcpp.h"\
+ {$(INCLUDE)}"\ace\Strategies.h"\
+ {$(INCLUDE)}"\ace\Strategies_T.cpp"\
+ {$(INCLUDE)}"\ace\Strategies_T.h"\
+ {$(INCLUDE)}"\ace\SV_Semaphore_Complex.h"\
+ {$(INCLUDE)}"\ace\SV_Semaphore_Complex.i"\
+ {$(INCLUDE)}"\ace\SV_Semaphore_Simple.h"\
+ {$(INCLUDE)}"\ace\SV_Semaphore_Simple.i"\
+ {$(INCLUDE)}"\ace\Svc_Conf_Tokens.h"\
+ {$(INCLUDE)}"\ace\Synch.h"\
+ {$(INCLUDE)}"\ace\Synch.i"\
+ {$(INCLUDE)}"\ace\Synch_Options.h"\
+ {$(INCLUDE)}"\ace\Synch_T.cpp"\
+ {$(INCLUDE)}"\ace\Synch_T.h"\
+ {$(INCLUDE)}"\ace\Synch_T.i"\
+ {$(INCLUDE)}"\ace\Thread.h"\
+ {$(INCLUDE)}"\ace\Thread.i"\
+ {$(INCLUDE)}"\ace\Thread_Manager.h"\
+ {$(INCLUDE)}"\ace\Thread_Manager.i"\
+ {$(INCLUDE)}"\ace\Time_Value.h"\
+ {$(INCLUDE)}"\ace\Timer_Heap.h"\
+ {$(INCLUDE)}"\ace\Timer_Heap_T.cpp"\
+ {$(INCLUDE)}"\ace\Timer_Heap_T.h"\
+ {$(INCLUDE)}"\ace\Timer_List.h"\
+ {$(INCLUDE)}"\ace\Timer_List_T.cpp"\
+ {$(INCLUDE)}"\ace\Timer_List_T.h"\
+ {$(INCLUDE)}"\ace\Timer_Queue.h"\
+ {$(INCLUDE)}"\ace\Timer_Queue_T.cpp"\
+ {$(INCLUDE)}"\ace\Timer_Queue_T.h"\
+ {$(INCLUDE)}"\ace\Timer_Queue_T.i"\
+ {$(INCLUDE)}"\ace\Timer_Wheel.h"\
+ {$(INCLUDE)}"\ace\Timer_Wheel_T.cpp"\
+ {$(INCLUDE)}"\ace\Timer_Wheel_T.h"\
+ {$(INCLUDE)}"\ace\Token.h"\
+ {$(INCLUDE)}"\ace\Token.i"\
+ {$(INCLUDE)}"\ace\Trace.h"\
+ {$(INCLUDE)}"\ace\ws2tcpip.h"\
"$(INTDIR)\simple_test.obj" : $(SOURCE) $(DEP_CPP_SIMPL) "$(INTDIR)"
@@ -1202,6 +1316,7 @@ DEP_CPP_NETWO=\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
diff --git a/examples/Reactor/ReactorEx/reactorex.mdp b/examples/Reactor/ReactorEx/reactorex.mdp
index 1e2bde0a14b..8abb3eb3924 100644
--- a/examples/Reactor/ReactorEx/reactorex.mdp
+++ b/examples/Reactor/ReactorEx/reactorex.mdp
Binary files differ
diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp
index 0ca4563ea25..5075437be94 100644
--- a/examples/Reactor/ReactorEx/test_reactorEx.cpp
+++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp
@@ -36,7 +36,7 @@
typedef ACE_Task<ACE_MT_SYNCH> MT_TASK;
-class Peer_Handler : public MT_TASK
+class Peer_Handler : public MT_TASK, public ACE_Handler
// = TITLE
// Connect to a server. Receive messages from STDIN_Handler
// and forward them to the server using proactive I/O.
@@ -51,22 +51,21 @@ public:
// It does blocking connects and accepts depending on whether a
// hostname was specified from the command line.
- virtual int handle_output_complete (ACE_Message_Block *msg,
- long bytes_transferred);
- // One of our asynchronous writes to the remote peer has completed.
- // Make sure it succeeded and then delete the message.
-
- virtual int handle_input_complete (ACE_Message_Block *msg,
- long bytes_transferred);
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This method will be called when an asynchronous read completes on a stream.
// The remote peer has sent us something. If it succeeded, print
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
- virtual ACE_Message_Block *get_message (void);
- // This is so the Proactor can get a message to read into.
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This method will be called when an asynchronous write completes on a strea_m.
+ // One of our asynchronous writes to the remote peer has completed.
+ // Make sure it succeeded and then delete the message.
- virtual ACE_HANDLE get_handle (void) const;
- // This is so the Proactor can get our handle.
+ virtual ACE_HANDLE handle (void) const;
+ // Get the I/O handle used by this <handler>. This method will be
+ // called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is
+ // passed to <open>.
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
// We've been removed from the ReactorEx.
@@ -90,6 +89,15 @@ private:
u_short port_;
// Port number for remote host.
+
+ ACE_Asynch_Read_Stream rd_stream_;
+ // Read stream
+
+ ACE_Asynch_Write_Stream wr_stream_;
+ // Write stream
+
+ ACE_Message_Block mb_;
+ // Message Block for reading from the network
};
class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH>
@@ -134,7 +142,8 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
port_ (ACE_DEFAULT_SERVER_PORT),
strategy_ (ACE_ReactorEx::instance (),
this,
- ACE_Event_Handler::WRITE_MASK)
+ ACE_Event_Handler::WRITE_MASK),
+ mb_ (BUFSIZ)
{
// This code sets up the message to notify us when a new message is
// added to the queue. Actually, the queue notifies ReactorEx which
@@ -178,7 +187,7 @@ Peer_Handler::open (void *)
if (connector.connect (stream_, addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1);
- ACE_DEBUG ((LM_DEBUG, "connected.\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n"));
}
else // Acceptor
{
@@ -189,45 +198,51 @@ Peer_Handler::open (void *)
(acceptor.accept (this->stream_) == -1))
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1);
- ACE_DEBUG ((LM_DEBUG, "accepted.\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n"));
}
- return ACE_Proactor::instance ()->initiate
- (this, ACE_Event_Handler::READ_MASK);
+ int result = this->rd_stream_.open (*this);
+ if (result != 0)
+ return result;
+
+ result = this->wr_stream_.open (*this);
+ if (result != 0)
+ return result;
+
+ result = this->rd_stream_.read (this->mb_,
+ this->mb_.size ());
+ return result;
}
// One of our asynchronous writes to the remote peer has completed.
// Make sure it succeeded and then delete the message.
-int
-Peer_Handler::handle_output_complete (ACE_Message_Block *msg,
- long bytes_transferred)
+void
+Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
- if (bytes_transferred <= 0)
- ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed",
- bytes_transferred));
-
- // This was allocated by the STDIN_Handler, queued, dequeued,
- // passed to the proactor, and now passed back to us.
- msg->release ();
- return 0; // Do not reinvoke a send.
+ if (result.bytes_transferred () <= 0)
+ ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed",
+ result.bytes_transferred ()));
+
+ // This was allocated by the STDIN_Handler, queued, dequeued, passed
+ // to the proactor, and now passed back to us.
+ result.message_block ().release ();
}
// The remote peer has sent us something. If it succeeded, print
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
-int
-Peer_Handler::handle_input_complete (ACE_Message_Block *msg,
- long bytes_transferred)
+
+void
+Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
- int result = 1; // Reinvokes the recv() operation by default!
-
- if (bytes_transferred > 0 && msg->length () > 0)
+ if (result.bytes_transferred () > 0 &&
+ this->mb_.length () > 0)
{
- msg->rd_ptr ()[bytes_transferred] = '\0';
+ this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0';
// Print out the message received from the server.
- ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ()));
}
else
{
@@ -235,29 +250,21 @@ Peer_Handler::handle_input_complete (ACE_Message_Block *msg,
// went away. We will end the event loop. Since we're in the
// main thread, we don't need to do a notify.
ACE_ReactorEx::end_event_loop();
- result = -1;
+ return;
}
- msg->release ();
- return result;
-}
-
-// This is so the Proactor can get a message to read into.
-
-ACE_Message_Block *
-Peer_Handler::get_message (void)
-{
- // An extra byte for NUL termination.
- ACE_Message_Block *message =
- new ACE_Message_Block (BUFSIZ + 1);
+ // Reset pointers
+ this->mb_.wr_ptr (-result.bytes_transferred ());
- message->size (BUFSIZ);
- return message;
+ // Start off another read
+ if (this->rd_stream_.read (this->mb_,
+ this->mb_.size ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler"));
}
// This is so the Proactor can get our handle.
ACE_HANDLE
-Peer_Handler::get_handle (void) const
+Peer_Handler::handle (void) const
{
return this->stream_.get_handle ();
}
@@ -266,7 +273,7 @@ Peer_Handler::get_handle (void) const
int
Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
- ACE_DEBUG ((LM_DEBUG, "Peer_Handler closing down\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n"));
return 0;
}
@@ -281,9 +288,9 @@ Peer_Handler::handle_output (ACE_HANDLE fd)
// Forward the message to the remote peer receiver.
if (this->getq (mb, &tv) != -1)
{
- if (ACE_Proactor::instance ()->
- initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1)
- ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"));
+ if (this->wr_stream_.write (*mb,
+ mb->length ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1);
}
return 0;
}
@@ -291,7 +298,7 @@ Peer_Handler::handle_output (ACE_HANDLE fd)
void
STDIN_Handler::handler (int signum)
{
- ACE_DEBUG ((LM_DEBUG, "signal = %S\n", signum));
+ ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum));
}
STDIN_Handler::STDIN_Handler (MT_TASK &ph)
@@ -355,7 +362,10 @@ STDIN_Handler::svc (void)
this->ph_.putq (mb);
}
else
- break;
+ {
+ mb->release ();
+ break;
+ }
}
// handle_signal will get called on the main proactor thread since
@@ -369,17 +379,13 @@ void
STDIN_Handler::register_thread_exit_hook (void)
{
// Get a real handle to our thread.
- ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_);
+ ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_);
// Register ourselves to get called back when our thread exits.
if (ACE_ReactorEx::instance ()->
register_handler (this, this->thr_handle_) == -1)
ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n"));
-
- // We're in another thread, so we need to notify the ReactorEx so
- // that it wakes up and waits on the new set of handles.
- ACE_ReactorEx::instance ()->notify ();
}
// The STDIN thread has exited. This means the user hit ^C. We can
@@ -388,7 +394,7 @@ STDIN_Handler::register_thread_exit_hook (void)
int
STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
{
- ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) STDIN thread has exited.\n"));
ACE_ASSERT (this->thr_handle_ == si->si_handle_);
ACE_ReactorEx::end_event_loop();
return 0;
@@ -397,6 +403,10 @@ STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
int
main (int argc, char *argv[])
{
+ // Let the proactor know that it will be used with ReactorEx
+ ACE_Proactor proactor (0, 0, 1);
+ ACE_Proactor::instance (&proactor);
+
// Open handler for remote peer communications this will run from
// the main thread.
Peer_Handler peer_handler (argc, argv);
diff --git a/examples/Reactor/WFMO_Reactor/reactorex.mak b/examples/Reactor/WFMO_Reactor/reactorex.mak
index 85bb469600c..b64c91567d2 100644
--- a/examples/Reactor/WFMO_Reactor/reactorex.mak
+++ b/examples/Reactor/WFMO_Reactor/reactorex.mak
@@ -76,8 +76,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/ntalk.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -134,8 +132,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/test_remove_handler.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -192,8 +188,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/test_timeout.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -250,8 +244,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/test_MT.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -310,8 +302,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/test_exception.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -371,8 +361,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/simple.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -431,8 +419,6 @@ CLEAN :
# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /YX /c
-CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
- /Fp"$(INTDIR)/network.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
CPP_OBJS=.\Debug/
CPP_SBRS=.\.
# ADD BASE RSC /l 0x409 /d "_DEBUG"
@@ -460,6 +446,9 @@ LINK32_OBJS= \
!ENDIF
+CPP_PROJ=/nologo /MDd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE"\
+ /Fp"$(INTDIR)/ntalk.pch" /YX /Fo"$(INTDIR)/" /Fd"$(INTDIR)/" /c
+
.c{$(CPP_OBJS)}.obj:
$(CPP) $(CPP_PROJ) $<
@@ -488,10 +477,12 @@ LINK32_OBJS= \
SOURCE=.\test_reactorEx.cpp
DEP_CPP_TEST_=\
{$(INCLUDE)}"\ace\ACE.h"\
+ {$(INCLUDE)}"\ace\ACE.i"\
{$(INCLUDE)}"\ace\Addr.h"\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -637,10 +628,12 @@ DEP_CPP_TEST_=\
SOURCE=.\test_remove_handler.cpp
DEP_CPP_TEST_R=\
{$(INCLUDE)}"\ace\ACE.h"\
+ {$(INCLUDE)}"\ace\ACE.i"\
{$(INCLUDE)}"\ace\Addr.h"\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -774,6 +767,7 @@ DEP_CPP_TEST_T=\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -902,10 +896,12 @@ DEP_CPP_TEST_T=\
SOURCE=.\test_MT.cpp
DEP_CPP_TEST_M=\
{$(INCLUDE)}"\ace\ACE.h"\
+ {$(INCLUDE)}"\ace\ACE.i"\
{$(INCLUDE)}"\ace\Addr.h"\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -1052,6 +1048,7 @@ DEP_CPP_TEST_E=\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
@@ -1179,7 +1176,124 @@ DEP_CPP_TEST_E=\
SOURCE=.\simple_test.cpp
DEP_CPP_SIMPL=\
+ {$(INCLUDE)}"\ace\ACE.h"\
+ {$(INCLUDE)}"\ace\ACE.i"\
+ {$(INCLUDE)}"\ace\Addr.h"\
+ {$(INCLUDE)}"\ace\Addr.i"\
+ {$(INCLUDE)}"\ace\Asynch_IO.h"\
+ {$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
+ {$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
+ {$(INCLUDE)}"\ace\Auto_Ptr.h"\
+ {$(INCLUDE)}"\ace\Auto_Ptr.i"\
+ {$(INCLUDE)}"\ace\config-win32-common.h"\
+ {$(INCLUDE)}"\ace\config-win32.h"\
+ {$(INCLUDE)}"\ace\config.h"\
+ {$(INCLUDE)}"\ace\Containers.cpp"\
+ {$(INCLUDE)}"\ace\Containers.h"\
+ {$(INCLUDE)}"\ace\Containers.i"\
+ {$(INCLUDE)}"\ace\Event_Handler.h"\
+ {$(INCLUDE)}"\ace\Event_Handler.i"\
+ {$(INCLUDE)}"\ace\Free_List.cpp"\
+ {$(INCLUDE)}"\ace\Free_List.h"\
+ {$(INCLUDE)}"\ace\Free_List.i"\
+ {$(INCLUDE)}"\ace\Handle_Set.h"\
+ {$(INCLUDE)}"\ace\Handle_Set.i"\
+ {$(INCLUDE)}"\ace\Hash_Map_Manager.cpp"\
+ {$(INCLUDE)}"\ace\Hash_Map_Manager.h"\
+ {$(INCLUDE)}"\ace\High_Res_Timer.h"\
+ {$(INCLUDE)}"\ace\High_Res_Timer.i"\
+ {$(INCLUDE)}"\ace\INET_Addr.h"\
+ {$(INCLUDE)}"\ace\INET_Addr.i"\
+ {$(INCLUDE)}"\ace\IO_Cntl_Msg.h"\
+ {$(INCLUDE)}"\ace\IPC_SAP.h"\
+ {$(INCLUDE)}"\ace\IPC_SAP.i"\
+ {$(INCLUDE)}"\ace\Local_Tokens.h"\
+ {$(INCLUDE)}"\ace\Local_Tokens.i"\
+ {$(INCLUDE)}"\ace\Log_Msg.h"\
+ {$(INCLUDE)}"\ace\Log_Priority.h"\
+ {$(INCLUDE)}"\ace\Log_Record.h"\
+ {$(INCLUDE)}"\ace\Log_Record.i"\
+ {$(INCLUDE)}"\ace\Malloc.h"\
+ {$(INCLUDE)}"\ace\Malloc.i"\
+ {$(INCLUDE)}"\ace\Malloc_T.cpp"\
+ {$(INCLUDE)}"\ace\Malloc_T.h"\
+ {$(INCLUDE)}"\ace\Malloc_T.i"\
+ {$(INCLUDE)}"\ace\Map_Manager.cpp"\
+ {$(INCLUDE)}"\ace\Map_Manager.h"\
+ {$(INCLUDE)}"\ace\Map_Manager.i"\
+ {$(INCLUDE)}"\ace\Mem_Map.h"\
+ {$(INCLUDE)}"\ace\Mem_Map.i"\
+ {$(INCLUDE)}"\ace\Memory_Pool.h"\
+ {$(INCLUDE)}"\ace\Memory_Pool.i"\
+ {$(INCLUDE)}"\ace\Message_Block.h"\
+ {$(INCLUDE)}"\ace\Message_Block.i"\
+ {$(INCLUDE)}"\ace\Message_Queue.cpp"\
+ {$(INCLUDE)}"\ace\Message_Queue.h"\
+ {$(INCLUDE)}"\ace\Message_Queue.i"\
+ {$(INCLUDE)}"\ace\OS.h"\
+ {$(INCLUDE)}"\ace\OS.i"\
+ {$(INCLUDE)}"\ace\Pipe.h"\
+ {$(INCLUDE)}"\ace\Pipe.i"\
+ {$(INCLUDE)}"\ace\Proactor.h"\
+ {$(INCLUDE)}"\ace\Proactor.i"\
+ {$(INCLUDE)}"\ace\Reactor.h"\
+ {$(INCLUDE)}"\ace\Reactor.i"\
{$(INCLUDE)}"\ace\ReactorEx.h"\
+ {$(INCLUDE)}"\ace\ReactorEx.i"\
+ {$(INCLUDE)}"\ace\Service_Config.h"\
+ {$(INCLUDE)}"\ace\Service_Config.i"\
+ {$(INCLUDE)}"\ace\Service_Object.h"\
+ {$(INCLUDE)}"\ace\Service_Object.i"\
+ {$(INCLUDE)}"\ace\Shared_Object.h"\
+ {$(INCLUDE)}"\ace\Shared_Object.i"\
+ {$(INCLUDE)}"\ace\Signal.h"\
+ {$(INCLUDE)}"\ace\Signal.i"\
+ {$(INCLUDE)}"\ace\SOCK.h"\
+ {$(INCLUDE)}"\ace\SOCK.i"\
+ {$(INCLUDE)}"\ace\SOCK_IO.h"\
+ {$(INCLUDE)}"\ace\SOCK_IO.i"\
+ {$(INCLUDE)}"\ace\SOCK_Stream.h"\
+ {$(INCLUDE)}"\ace\SOCK_Stream.i"\
+ {$(INCLUDE)}"\ace\SString.h"\
+ {$(INCLUDE)}"\ace\SString.i"\
+ {$(INCLUDE)}"\ace\stdcpp.h"\
+ {$(INCLUDE)}"\ace\Strategies.h"\
+ {$(INCLUDE)}"\ace\Strategies_T.cpp"\
+ {$(INCLUDE)}"\ace\Strategies_T.h"\
+ {$(INCLUDE)}"\ace\SV_Semaphore_Complex.h"\
+ {$(INCLUDE)}"\ace\SV_Semaphore_Complex.i"\
+ {$(INCLUDE)}"\ace\SV_Semaphore_Simple.h"\
+ {$(INCLUDE)}"\ace\SV_Semaphore_Simple.i"\
+ {$(INCLUDE)}"\ace\Svc_Conf_Tokens.h"\
+ {$(INCLUDE)}"\ace\Synch.h"\
+ {$(INCLUDE)}"\ace\Synch.i"\
+ {$(INCLUDE)}"\ace\Synch_Options.h"\
+ {$(INCLUDE)}"\ace\Synch_T.cpp"\
+ {$(INCLUDE)}"\ace\Synch_T.h"\
+ {$(INCLUDE)}"\ace\Synch_T.i"\
+ {$(INCLUDE)}"\ace\Thread.h"\
+ {$(INCLUDE)}"\ace\Thread.i"\
+ {$(INCLUDE)}"\ace\Thread_Manager.h"\
+ {$(INCLUDE)}"\ace\Thread_Manager.i"\
+ {$(INCLUDE)}"\ace\Time_Value.h"\
+ {$(INCLUDE)}"\ace\Timer_Heap.h"\
+ {$(INCLUDE)}"\ace\Timer_Heap_T.cpp"\
+ {$(INCLUDE)}"\ace\Timer_Heap_T.h"\
+ {$(INCLUDE)}"\ace\Timer_List.h"\
+ {$(INCLUDE)}"\ace\Timer_List_T.cpp"\
+ {$(INCLUDE)}"\ace\Timer_List_T.h"\
+ {$(INCLUDE)}"\ace\Timer_Queue.h"\
+ {$(INCLUDE)}"\ace\Timer_Queue_T.cpp"\
+ {$(INCLUDE)}"\ace\Timer_Queue_T.h"\
+ {$(INCLUDE)}"\ace\Timer_Queue_T.i"\
+ {$(INCLUDE)}"\ace\Timer_Wheel.h"\
+ {$(INCLUDE)}"\ace\Timer_Wheel_T.cpp"\
+ {$(INCLUDE)}"\ace\Timer_Wheel_T.h"\
+ {$(INCLUDE)}"\ace\Token.h"\
+ {$(INCLUDE)}"\ace\Token.i"\
+ {$(INCLUDE)}"\ace\Trace.h"\
+ {$(INCLUDE)}"\ace\ws2tcpip.h"\
"$(INTDIR)\simple_test.obj" : $(SOURCE) $(DEP_CPP_SIMPL) "$(INTDIR)"
@@ -1202,6 +1316,7 @@ DEP_CPP_NETWO=\
{$(INCLUDE)}"\ace\Addr.i"\
{$(INCLUDE)}"\ace\Asynch_IO.h"\
{$(INCLUDE)}"\ace\Asynch_IO.i"\
+ {$(INCLUDE)}"\ace\Atomic_Op.i"\
{$(INCLUDE)}"\ace\Auto_Ptr.cpp"\
{$(INCLUDE)}"\ace\Auto_Ptr.h"\
{$(INCLUDE)}"\ace\Auto_Ptr.i"\
diff --git a/examples/Reactor/WFMO_Reactor/reactorex.mdp b/examples/Reactor/WFMO_Reactor/reactorex.mdp
index 1e2bde0a14b..8abb3eb3924 100644
--- a/examples/Reactor/WFMO_Reactor/reactorex.mdp
+++ b/examples/Reactor/WFMO_Reactor/reactorex.mdp
Binary files differ
diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
index 0ca4563ea25..5075437be94 100644
--- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
+++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
@@ -36,7 +36,7 @@
typedef ACE_Task<ACE_MT_SYNCH> MT_TASK;
-class Peer_Handler : public MT_TASK
+class Peer_Handler : public MT_TASK, public ACE_Handler
// = TITLE
// Connect to a server. Receive messages from STDIN_Handler
// and forward them to the server using proactive I/O.
@@ -51,22 +51,21 @@ public:
// It does blocking connects and accepts depending on whether a
// hostname was specified from the command line.
- virtual int handle_output_complete (ACE_Message_Block *msg,
- long bytes_transferred);
- // One of our asynchronous writes to the remote peer has completed.
- // Make sure it succeeded and then delete the message.
-
- virtual int handle_input_complete (ACE_Message_Block *msg,
- long bytes_transferred);
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This method will be called when an asynchronous read completes on a stream.
// The remote peer has sent us something. If it succeeded, print
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
- virtual ACE_Message_Block *get_message (void);
- // This is so the Proactor can get a message to read into.
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This method will be called when an asynchronous write completes on a strea_m.
+ // One of our asynchronous writes to the remote peer has completed.
+ // Make sure it succeeded and then delete the message.
- virtual ACE_HANDLE get_handle (void) const;
- // This is so the Proactor can get our handle.
+ virtual ACE_HANDLE handle (void) const;
+ // Get the I/O handle used by this <handler>. This method will be
+ // called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is
+ // passed to <open>.
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
// We've been removed from the ReactorEx.
@@ -90,6 +89,15 @@ private:
u_short port_;
// Port number for remote host.
+
+ ACE_Asynch_Read_Stream rd_stream_;
+ // Read stream
+
+ ACE_Asynch_Write_Stream wr_stream_;
+ // Write stream
+
+ ACE_Message_Block mb_;
+ // Message Block for reading from the network
};
class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH>
@@ -134,7 +142,8 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
port_ (ACE_DEFAULT_SERVER_PORT),
strategy_ (ACE_ReactorEx::instance (),
this,
- ACE_Event_Handler::WRITE_MASK)
+ ACE_Event_Handler::WRITE_MASK),
+ mb_ (BUFSIZ)
{
// This code sets up the message to notify us when a new message is
// added to the queue. Actually, the queue notifies ReactorEx which
@@ -178,7 +187,7 @@ Peer_Handler::open (void *)
if (connector.connect (stream_, addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1);
- ACE_DEBUG ((LM_DEBUG, "connected.\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n"));
}
else // Acceptor
{
@@ -189,45 +198,51 @@ Peer_Handler::open (void *)
(acceptor.accept (this->stream_) == -1))
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1);
- ACE_DEBUG ((LM_DEBUG, "accepted.\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n"));
}
- return ACE_Proactor::instance ()->initiate
- (this, ACE_Event_Handler::READ_MASK);
+ int result = this->rd_stream_.open (*this);
+ if (result != 0)
+ return result;
+
+ result = this->wr_stream_.open (*this);
+ if (result != 0)
+ return result;
+
+ result = this->rd_stream_.read (this->mb_,
+ this->mb_.size ());
+ return result;
}
// One of our asynchronous writes to the remote peer has completed.
// Make sure it succeeded and then delete the message.
-int
-Peer_Handler::handle_output_complete (ACE_Message_Block *msg,
- long bytes_transferred)
+void
+Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
- if (bytes_transferred <= 0)
- ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed",
- bytes_transferred));
-
- // This was allocated by the STDIN_Handler, queued, dequeued,
- // passed to the proactor, and now passed back to us.
- msg->release ();
- return 0; // Do not reinvoke a send.
+ if (result.bytes_transferred () <= 0)
+ ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed",
+ result.bytes_transferred ()));
+
+ // This was allocated by the STDIN_Handler, queued, dequeued, passed
+ // to the proactor, and now passed back to us.
+ result.message_block ().release ();
}
// The remote peer has sent us something. If it succeeded, print
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
-int
-Peer_Handler::handle_input_complete (ACE_Message_Block *msg,
- long bytes_transferred)
+
+void
+Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
- int result = 1; // Reinvokes the recv() operation by default!
-
- if (bytes_transferred > 0 && msg->length () > 0)
+ if (result.bytes_transferred () > 0 &&
+ this->mb_.length () > 0)
{
- msg->rd_ptr ()[bytes_transferred] = '\0';
+ this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0';
// Print out the message received from the server.
- ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ()));
}
else
{
@@ -235,29 +250,21 @@ Peer_Handler::handle_input_complete (ACE_Message_Block *msg,
// went away. We will end the event loop. Since we're in the
// main thread, we don't need to do a notify.
ACE_ReactorEx::end_event_loop();
- result = -1;
+ return;
}
- msg->release ();
- return result;
-}
-
-// This is so the Proactor can get a message to read into.
-
-ACE_Message_Block *
-Peer_Handler::get_message (void)
-{
- // An extra byte for NUL termination.
- ACE_Message_Block *message =
- new ACE_Message_Block (BUFSIZ + 1);
+ // Reset pointers
+ this->mb_.wr_ptr (-result.bytes_transferred ());
- message->size (BUFSIZ);
- return message;
+ // Start off another read
+ if (this->rd_stream_.read (this->mb_,
+ this->mb_.size ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler"));
}
// This is so the Proactor can get our handle.
ACE_HANDLE
-Peer_Handler::get_handle (void) const
+Peer_Handler::handle (void) const
{
return this->stream_.get_handle ();
}
@@ -266,7 +273,7 @@ Peer_Handler::get_handle (void) const
int
Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
- ACE_DEBUG ((LM_DEBUG, "Peer_Handler closing down\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n"));
return 0;
}
@@ -281,9 +288,9 @@ Peer_Handler::handle_output (ACE_HANDLE fd)
// Forward the message to the remote peer receiver.
if (this->getq (mb, &tv) != -1)
{
- if (ACE_Proactor::instance ()->
- initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1)
- ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"));
+ if (this->wr_stream_.write (*mb,
+ mb->length ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1);
}
return 0;
}
@@ -291,7 +298,7 @@ Peer_Handler::handle_output (ACE_HANDLE fd)
void
STDIN_Handler::handler (int signum)
{
- ACE_DEBUG ((LM_DEBUG, "signal = %S\n", signum));
+ ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum));
}
STDIN_Handler::STDIN_Handler (MT_TASK &ph)
@@ -355,7 +362,10 @@ STDIN_Handler::svc (void)
this->ph_.putq (mb);
}
else
- break;
+ {
+ mb->release ();
+ break;
+ }
}
// handle_signal will get called on the main proactor thread since
@@ -369,17 +379,13 @@ void
STDIN_Handler::register_thread_exit_hook (void)
{
// Get a real handle to our thread.
- ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_);
+ ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_);
// Register ourselves to get called back when our thread exits.
if (ACE_ReactorEx::instance ()->
register_handler (this, this->thr_handle_) == -1)
ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n"));
-
- // We're in another thread, so we need to notify the ReactorEx so
- // that it wakes up and waits on the new set of handles.
- ACE_ReactorEx::instance ()->notify ();
}
// The STDIN thread has exited. This means the user hit ^C. We can
@@ -388,7 +394,7 @@ STDIN_Handler::register_thread_exit_hook (void)
int
STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
{
- ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) STDIN thread has exited.\n"));
ACE_ASSERT (this->thr_handle_ == si->si_handle_);
ACE_ReactorEx::end_event_loop();
return 0;
@@ -397,6 +403,10 @@ STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
int
main (int argc, char *argv[])
{
+ // Let the proactor know that it will be used with ReactorEx
+ ACE_Proactor proactor (0, 0, 1);
+ ACE_Proactor::instance (&proactor);
+
// Open handler for remote peer communications this will run from
// the main thread.
Peer_Handler peer_handler (argc, argv);