summaryrefslogtreecommitdiff
path: root/examples/Reactor/WFMO_Reactor/test_talker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/Reactor/WFMO_Reactor/test_talker.cpp')
-rw-r--r--examples/Reactor/WFMO_Reactor/test_talker.cpp192
1 files changed, 96 insertions, 96 deletions
diff --git a/examples/Reactor/WFMO_Reactor/test_talker.cpp b/examples/Reactor/WFMO_Reactor/test_talker.cpp
index 0a52bf3ea9e..83eedf58c97 100644
--- a/examples/Reactor/WFMO_Reactor/test_talker.cpp
+++ b/examples/Reactor/WFMO_Reactor/test_talker.cpp
@@ -4,7 +4,7 @@
//
// = LIBRARY
// examples
-//
+//
// = FILENAME
// test_talker.cpp
//
@@ -15,13 +15,13 @@
// ^C events, reading from STDIN, vanilla Win32 events, thread
// exits, Reactor notifications, proactive reads, and proactive
// writes.
-//
+//
// The proactive I/O events are demultiplexed by the ACE_Proactor.
// The thread exits, notications, and vanilla Win32 events are
// demultiplexed by the ACE_Reactor. To enable a single thread
// to run all these events, the Proactor is integrated with the
// Reactor.
-//
+//
// The test application prototypes a simple talk program. Two
// instances of the application connect. Input from either console
// is displayed on the others console also. Because of the evils
@@ -29,46 +29,46 @@
// To test the Proactor and Reactor, I/O between the remote
// processes is performed proactively and interactions between the
// STDIN thread and the main thread are performed reactively.
-//
+//
// The following description of the test application is in two
// parts. The participants section explains the main components
// involved in the application. The collaboration section
// describes how the partipants interact in response to the
// multiple event types which occur.
-//
+//
// The Reactor test application has the following participants:
-//
+//
// . Reactor -- The Reactor demultiplexes Win32 "waitable"
// events using WaitForMultipleObjects.
-//
+//
// . Proactor -- The proactor initiates and demultiplexes
// overlapped I/O operations. The Proactor registers with the
// Reactor so that a single-thread can demultiplex all
// application events.
-//
+//
// . STDIN_Handler -- STDIN_Handler is an Active Object which reads
// from STDIN and forwards the input to the Peer_Handler. This
// runs in a separate thread to make the test more interesting.
// However, STDIN is "waitable", so in general it can be waited on
// by the ACE Reactor, thanks MicroSlush!
-//
+//
// . Peer_Handler -- The Peer_Handler connects to another instance
// of test_reactor. It Proactively reads and writes data to the
// peer. When the STDIN_Handler gives it messages, it fowards them
// to the remote peer. When it receives messages from the remote
// peer, it prints the output to the console.
-//
+//
// The collaborations of the participants are as follows:
-//
+//
// . Initialization
-//
+//
// Peer_Handler -- connects to the remote peer. It then begins
// proactively reading from the remote connection. Note that it
// will be notified by the Proactor when a read completes. It
// also registers a notification strategy with message queue so
// that it is notified when the STDIN_Handler posts a message
// onto the queue.
-//
+//
// STDIN_Handler -- STDIN_Handler registers a signal handler for
// SIGINT. This just captures the exception so that the kernel
// doesn't kill our process; We want to exit gracefully. It also
@@ -76,29 +76,29 @@
// STDIN_Handler's thread handle with the Reactor. The
// Exit_Hook will get called back when the STDIN_Handler thread
// exits. After registering these, it blocks reading from STDIN.
-//
+//
// Proactor -- is registered with the Reactor.
-//
+//
// The main thread of control waits in the Reactor.
-//
+//
// . STDIN events -- When the STDIN_Handler thread reads from
// STDIN, it puts the message on Peer_Handler's message queue. It
// then returns to reading from STDIN.
-//
+//
// . Message enqueue -- The Reactor thread wakes up and calls
// Peer_Handler::handle_output. The Peer_Handler then tries to
// dequeue a message from its message queue. If it can, the
// message is Proactively sent to the remote peer. Note that the
// Peer_Handler will be notified with this operation is complete.
// The Peer_Handler then falls back into the Reactor event loop.
-//
+//
// . Send complete event -- When a proactive send is complete, the
// Proactor is notified by the Reactor. The Proactor, in turn,
// notifies the Peer_Handler. The Peer_Handler then checks for
// more messages from the message queue. If there are any, it
// tries to send them. If there are not, it returns to the
-// Reactor event loop.
-//
+// Reactor event loop.
+//
// . Read complete event -- When a proactive read is complete (the
// Peer_Handler initiated a proactive read when it connected to the
// remote peer), the Proactor is notified by the Reactor. The
@@ -108,18 +108,18 @@
// connection. If the read failed (i.e. the remote peer exited),
// the Peer_Handler sets a flag to end the event loop and returns.
// This will cause the application to exit.
-//
+//
// . ^C events -- When the user types ^C at the console, the
// STDIN_Handler's signal handler will be called. It does nothing,
// but as a result of the signal, the STDIN_Handler thread will
// exit.
-//
+//
// . STDIN_Handler thread exits -- The Exit_Hook will get called
// back from the Reactor. Exit_Hook::handle_signal sets a flag
// to end the event loop and returns. This will cause the
// application to exit.
-//
-//
+//
+//
// To run example, start an instance of the test with an optional
// local port argument (as the acceptor). Start the other instance
// with -h <hostname> and -p <server port>. Type in either the
@@ -127,9 +127,9 @@
// other window. Control C to exit.
//
// = AUTHOR
-// Tim Harrison
+// Tim Harrison
// Irfan Pyarali
-//
+//
// ============================================================================
#include "ace/Reactor.h"
@@ -142,13 +142,13 @@
#include "ace/Synch.h"
#include "ace/Task.h"
-ACE_RCSID(WFMO_Reactor, test_talker, "$Id$")
+ACE_RCSID(ReactorEx, test_talker, "$Id$")
typedef ACE_Task<ACE_MT_SYNCH> MT_TASK;
class Peer_Handler : public MT_TASK, public ACE_Handler
// = TITLE
-// Connect to a server. Receive messages from STDIN_Handler
+// Connect to a server. Receive messages from STDIN_Handler
// and forward them to the server using proactive I/O.
{
public:
@@ -162,13 +162,13 @@ public:
// hostname was specified from the command line.
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
- // This method will be called when an asynchronous read completes on a stream.
+ // This method will be called when an asynchronous read completes on a stream.
// The remote peer has sent us something. If it succeeded, print
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
- // This method will be called when an asynchronous write completes on a strea_m.
+ // This method will be called when an asynchronous write completes on a strea_m.
// One of our asynchronous writes to the remote peer has completed.
// Make sure it succeeded and then delete the message.
@@ -250,9 +250,9 @@ private:
Peer_Handler::Peer_Handler (int argc, char *argv[])
: host_ (0),
port_ (ACE_DEFAULT_SERVER_PORT),
- strategy_ (ACE_Reactor::instance (),
- this,
- ACE_Event_Handler::WRITE_MASK),
+ strategy_ (ACE_Reactor::instance (),
+ this,
+ ACE_Event_Handler::WRITE_MASK),
mb_ (BUFSIZ)
{
// This code sets up the message to notify us when a new message is
@@ -266,14 +266,14 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
while ((c = get_opt ()) != EOF)
{
switch (c)
- {
- case 'h':
- host_ = get_opt.optarg;
- break;
- case 'p':
- port_ = ACE_OS::atoi (get_opt.optarg);
- break;
- }
+ {
+ case 'h':
+ host_ = get_opt.optarg;
+ break;
+ case 'p':
+ port_ = ACE_OS::atoi (get_opt.optarg);
+ break;
+ }
}
}
@@ -285,7 +285,7 @@ Peer_Handler::~Peer_Handler (void)
// does blocking connects and accepts depending on whether a hostname
// was specified from the command line.
-int
+int
Peer_Handler::open (void *)
{
if (host_ != 0) // Connector
@@ -295,7 +295,7 @@ Peer_Handler::open (void *)
// Establish connection with server.
if (connector.connect (stream_, addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1);
ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n"));
}
@@ -305,8 +305,8 @@ Peer_Handler::open (void *)
ACE_INET_Addr local_addr (port_);
if ((acceptor.open (local_addr) == -1) ||
- (acceptor.accept (this->stream_) == -1))
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1);
+ (acceptor.accept (this->stream_) == -1))
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1);
ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n"));
}
@@ -314,26 +314,26 @@ Peer_Handler::open (void *)
int result = this->rd_stream_.open (*this);
if (result != 0)
return result;
-
+
result = this->wr_stream_.open (*this);
if (result != 0)
return result;
result = this->rd_stream_.read (this->mb_,
- this->mb_.size ());
- return result;
+ this->mb_.size ());
+ return result;
}
// One of our asynchronous writes to the remote peer has completed.
// Make sure it succeeded and then delete the message.
-void
+void
Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
if (result.bytes_transferred () <= 0)
ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed",
- result.bytes_transferred ()));
-
+ result.bytes_transferred ()));
+
// This was allocated by the STDIN_Handler, queued, dequeued, passed
// to the proactor, and now passed back to us.
result.message_block ().release ();
@@ -343,11 +343,11 @@ Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
-
-void
+
+void
Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
- if (result.bytes_transferred () > 0 &&
+ if (result.bytes_transferred () > 0 &&
this->mb_.length () > 0)
{
this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0';
@@ -363,24 +363,24 @@ Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
return;
}
- // Reset pointers
+ // Reset pointers
this->mb_.wr_ptr (this->mb_.wr_ptr () - result.bytes_transferred ());
// Start off another read
if (this->rd_stream_.read (this->mb_,
- this->mb_.size ()) == -1)
+ this->mb_.size ()) == -1)
ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler"));
}
// This is so the Proactor can get our handle.
-ACE_HANDLE
+ACE_HANDLE
Peer_Handler::handle (void) const
{
return this->stream_.get_handle ();
}
// We've been removed from the Reactor.
-int
+int
Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n"));
@@ -388,7 +388,7 @@ Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
}
// New stuff added to the message queue. Try to dequeue a message.
-int
+int
Peer_Handler::handle_output (ACE_HANDLE fd)
{
ACE_Message_Block *mb;
@@ -399,13 +399,13 @@ Peer_Handler::handle_output (ACE_HANDLE fd)
if (this->getq (mb, &tv) != -1)
{
if (this->wr_stream_.write (*mb,
- mb->length ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1);
+ mb->length ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1);
}
return 0;
}
-void
+void
STDIN_Handler::handler (int signum)
{
ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum));
@@ -435,7 +435,7 @@ STDIN_Handler::open (void *)
// Shut down.
-int
+int
STDIN_Handler::close (u_long)
{
ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n"));
@@ -444,38 +444,38 @@ STDIN_Handler::close (u_long)
// Thread runs here.
-int
+int
STDIN_Handler::svc (void)
{
this->register_thread_exit_hook ();
-
+
for (;;)
{
ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ);
// Read from stdin into mb.
- int read_result = ACE_OS::read (ACE_STDIN,
- mb->rd_ptr (),
- mb->size ());
+ int read_result = ACE_OS::read (ACE_STDIN,
+ mb->rd_ptr (),
+ mb->size ());
// If read succeeds, put mb to peer handler, else end the loop.
if (read_result > 0)
- {
- mb->wr_ptr (read_result);
- // Note that this call will first enqueue mb onto the peer
- // handler's message queue, which will then turn around and
- // notify the Reactor via the Notification_Strategy. This
- // will subsequently signal the Peer_Handler, which will
- // react by calling back to its handle_output() method,
- // which dequeues the message and sends it to the peer
- // across the network.
- this->ph_.putq (mb);
- }
- else
- {
- mb->release ();
- break;
- }
+ {
+ mb->wr_ptr (read_result);
+ // Note that this call will first enqueue mb onto the peer
+ // handler's message queue, which will then turn around and
+ // notify the Reactor via the Notification_Strategy. This
+ // will subsequently signal the Peer_Handler, which will
+ // react by calling back to its handle_output() method,
+ // which dequeues the message and sends it to the peer
+ // across the network.
+ this->ph_.putq (mb);
+ }
+ else
+ {
+ mb->release ();
+ break;
+ }
}
// handle_signal will get called on the main proactor thread since
@@ -483,7 +483,7 @@ STDIN_Handler::svc (void)
return 0;
}
-// Register an exit hook with the reactor.
+// Register an exit hook with the reactor.
void
STDIN_Handler::register_thread_exit_hook (void)
@@ -501,7 +501,7 @@ STDIN_Handler::register_thread_exit_hook (void)
// The STDIN thread has exited. This means the user hit ^C. We can
// end the event loop and delete ourself.
-int
+int
STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
{
ACE_ASSERT (this->thr_handle_ == si->si_handle_);
@@ -525,34 +525,34 @@ main (int argc, char *argv[])
Peer_Handler peer_handler (argc, argv);
if (peer_handler.open () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p open failed, errno = %d.\n",
- "peer_handler", errno), 0);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p open failed, errno = %d.\n",
+ "peer_handler", errno), 0);
// Open active object for reading from stdin.
STDIN_Handler stdin_handler (peer_handler);
// Spawn thread.
if (stdin_handler.open () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p open failed, errno = %d.\n",
- "stdin_handler", errno), 0);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p open failed, errno = %d.\n",
+ "stdin_handler", errno), 0);
// Register proactor with Reactor so that we can demultiplex
// "waitable" events and I/O operations from a single thread.
- if (ACE_Reactor::instance ()->register_handler
+ if (ACE_Reactor::instance ()->register_handler
(ACE_Proactor::instance ()->implementation ()) != 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n",
- argv[0]), -1);
+ argv[0]), -1);
// Run main event demultiplexor.
ACE_Reactor::run_event_loop ();
// Remove proactor with Reactor.
- if (ACE_Reactor::instance ()->remove_handler
+ if (ACE_Reactor::instance ()->remove_handler
(ACE_Proactor::instance (), ACE_Event_Handler::DONT_CALL) != 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n",
- argv[0]), -1);
+ argv[0]), -1);
return 0;
}