diff options
Diffstat (limited to 'examples/Reactor/WFMO_Reactor/test_talker.cpp')
-rw-r--r-- | examples/Reactor/WFMO_Reactor/test_talker.cpp | 192 |
1 files changed, 96 insertions, 96 deletions
diff --git a/examples/Reactor/WFMO_Reactor/test_talker.cpp b/examples/Reactor/WFMO_Reactor/test_talker.cpp index 0a52bf3ea9e..83eedf58c97 100644 --- a/examples/Reactor/WFMO_Reactor/test_talker.cpp +++ b/examples/Reactor/WFMO_Reactor/test_talker.cpp @@ -4,7 +4,7 @@ // // = LIBRARY // examples -// +// // = FILENAME // test_talker.cpp // @@ -15,13 +15,13 @@ // ^C events, reading from STDIN, vanilla Win32 events, thread // exits, Reactor notifications, proactive reads, and proactive // writes. -// +// // The proactive I/O events are demultiplexed by the ACE_Proactor. // The thread exits, notications, and vanilla Win32 events are // demultiplexed by the ACE_Reactor. To enable a single thread // to run all these events, the Proactor is integrated with the // Reactor. -// +// // The test application prototypes a simple talk program. Two // instances of the application connect. Input from either console // is displayed on the others console also. Because of the evils @@ -29,46 +29,46 @@ // To test the Proactor and Reactor, I/O between the remote // processes is performed proactively and interactions between the // STDIN thread and the main thread are performed reactively. -// +// // The following description of the test application is in two // parts. The participants section explains the main components // involved in the application. The collaboration section // describes how the partipants interact in response to the // multiple event types which occur. -// +// // The Reactor test application has the following participants: -// +// // . Reactor -- The Reactor demultiplexes Win32 "waitable" // events using WaitForMultipleObjects. -// +// // . Proactor -- The proactor initiates and demultiplexes // overlapped I/O operations. The Proactor registers with the // Reactor so that a single-thread can demultiplex all // application events. -// +// // . STDIN_Handler -- STDIN_Handler is an Active Object which reads // from STDIN and forwards the input to the Peer_Handler. This // runs in a separate thread to make the test more interesting. // However, STDIN is "waitable", so in general it can be waited on // by the ACE Reactor, thanks MicroSlush! -// +// // . Peer_Handler -- The Peer_Handler connects to another instance // of test_reactor. It Proactively reads and writes data to the // peer. When the STDIN_Handler gives it messages, it fowards them // to the remote peer. When it receives messages from the remote // peer, it prints the output to the console. -// +// // The collaborations of the participants are as follows: -// +// // . Initialization -// +// // Peer_Handler -- connects to the remote peer. It then begins // proactively reading from the remote connection. Note that it // will be notified by the Proactor when a read completes. It // also registers a notification strategy with message queue so // that it is notified when the STDIN_Handler posts a message // onto the queue. -// +// // STDIN_Handler -- STDIN_Handler registers a signal handler for // SIGINT. This just captures the exception so that the kernel // doesn't kill our process; We want to exit gracefully. It also @@ -76,29 +76,29 @@ // STDIN_Handler's thread handle with the Reactor. The // Exit_Hook will get called back when the STDIN_Handler thread // exits. After registering these, it blocks reading from STDIN. -// +// // Proactor -- is registered with the Reactor. -// +// // The main thread of control waits in the Reactor. -// +// // . STDIN events -- When the STDIN_Handler thread reads from // STDIN, it puts the message on Peer_Handler's message queue. It // then returns to reading from STDIN. -// +// // . Message enqueue -- The Reactor thread wakes up and calls // Peer_Handler::handle_output. The Peer_Handler then tries to // dequeue a message from its message queue. If it can, the // message is Proactively sent to the remote peer. Note that the // Peer_Handler will be notified with this operation is complete. // The Peer_Handler then falls back into the Reactor event loop. -// +// // . Send complete event -- When a proactive send is complete, the // Proactor is notified by the Reactor. The Proactor, in turn, // notifies the Peer_Handler. The Peer_Handler then checks for // more messages from the message queue. If there are any, it // tries to send them. If there are not, it returns to the -// Reactor event loop. -// +// Reactor event loop. +// // . Read complete event -- When a proactive read is complete (the // Peer_Handler initiated a proactive read when it connected to the // remote peer), the Proactor is notified by the Reactor. The @@ -108,18 +108,18 @@ // connection. If the read failed (i.e. the remote peer exited), // the Peer_Handler sets a flag to end the event loop and returns. // This will cause the application to exit. -// +// // . ^C events -- When the user types ^C at the console, the // STDIN_Handler's signal handler will be called. It does nothing, // but as a result of the signal, the STDIN_Handler thread will // exit. -// +// // . STDIN_Handler thread exits -- The Exit_Hook will get called // back from the Reactor. Exit_Hook::handle_signal sets a flag // to end the event loop and returns. This will cause the // application to exit. -// -// +// +// // To run example, start an instance of the test with an optional // local port argument (as the acceptor). Start the other instance // with -h <hostname> and -p <server port>. Type in either the @@ -127,9 +127,9 @@ // other window. Control C to exit. // // = AUTHOR -// Tim Harrison +// Tim Harrison // Irfan Pyarali -// +// // ============================================================================ #include "ace/Reactor.h" @@ -142,13 +142,13 @@ #include "ace/Synch.h" #include "ace/Task.h" -ACE_RCSID(WFMO_Reactor, test_talker, "$Id$") +ACE_RCSID(ReactorEx, test_talker, "$Id$") typedef ACE_Task<ACE_MT_SYNCH> MT_TASK; class Peer_Handler : public MT_TASK, public ACE_Handler // = TITLE -// Connect to a server. Receive messages from STDIN_Handler +// Connect to a server. Receive messages from STDIN_Handler // and forward them to the server using proactive I/O. { public: @@ -162,13 +162,13 @@ public: // hostname was specified from the command line. virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); - // This method will be called when an asynchronous read completes on a stream. + // This method will be called when an asynchronous read completes on a stream. // The remote peer has sent us something. If it succeeded, print // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); - // This method will be called when an asynchronous write completes on a strea_m. + // This method will be called when an asynchronous write completes on a strea_m. // One of our asynchronous writes to the remote peer has completed. // Make sure it succeeded and then delete the message. @@ -250,9 +250,9 @@ private: Peer_Handler::Peer_Handler (int argc, char *argv[]) : host_ (0), port_ (ACE_DEFAULT_SERVER_PORT), - strategy_ (ACE_Reactor::instance (), - this, - ACE_Event_Handler::WRITE_MASK), + strategy_ (ACE_Reactor::instance (), + this, + ACE_Event_Handler::WRITE_MASK), mb_ (BUFSIZ) { // This code sets up the message to notify us when a new message is @@ -266,14 +266,14 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) while ((c = get_opt ()) != EOF) { switch (c) - { - case 'h': - host_ = get_opt.optarg; - break; - case 'p': - port_ = ACE_OS::atoi (get_opt.optarg); - break; - } + { + case 'h': + host_ = get_opt.optarg; + break; + case 'p': + port_ = ACE_OS::atoi (get_opt.optarg); + break; + } } } @@ -285,7 +285,7 @@ Peer_Handler::~Peer_Handler (void) // does blocking connects and accepts depending on whether a hostname // was specified from the command line. -int +int Peer_Handler::open (void *) { if (host_ != 0) // Connector @@ -295,7 +295,7 @@ Peer_Handler::open (void *) // Establish connection with server. if (connector.connect (stream_, addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n")); } @@ -305,8 +305,8 @@ Peer_Handler::open (void *) ACE_INET_Addr local_addr (port_); if ((acceptor.open (local_addr) == -1) || - (acceptor.accept (this->stream_) == -1)) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); + (acceptor.accept (this->stream_) == -1)) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n")); } @@ -314,26 +314,26 @@ Peer_Handler::open (void *) int result = this->rd_stream_.open (*this); if (result != 0) return result; - + result = this->wr_stream_.open (*this); if (result != 0) return result; result = this->rd_stream_.read (this->mb_, - this->mb_.size ()); - return result; + this->mb_.size ()); + return result; } // One of our asynchronous writes to the remote peer has completed. // Make sure it succeeded and then delete the message. -void +void Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { if (result.bytes_transferred () <= 0) ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed", - result.bytes_transferred ())); - + result.bytes_transferred ())); + // This was allocated by the STDIN_Handler, queued, dequeued, passed // to the proactor, and now passed back to us. result.message_block ().release (); @@ -343,11 +343,11 @@ Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. - -void + +void Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { - if (result.bytes_transferred () > 0 && + if (result.bytes_transferred () > 0 && this->mb_.length () > 0) { this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0'; @@ -363,24 +363,24 @@ Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) return; } - // Reset pointers + // Reset pointers this->mb_.wr_ptr (this->mb_.wr_ptr () - result.bytes_transferred ()); // Start off another read if (this->rd_stream_.read (this->mb_, - this->mb_.size ()) == -1) + this->mb_.size ()) == -1) ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler")); } // This is so the Proactor can get our handle. -ACE_HANDLE +ACE_HANDLE Peer_Handler::handle (void) const { return this->stream_.get_handle (); } // We've been removed from the Reactor. -int +int Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n")); @@ -388,7 +388,7 @@ Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) } // New stuff added to the message queue. Try to dequeue a message. -int +int Peer_Handler::handle_output (ACE_HANDLE fd) { ACE_Message_Block *mb; @@ -399,13 +399,13 @@ Peer_Handler::handle_output (ACE_HANDLE fd) if (this->getq (mb, &tv) != -1) { if (this->wr_stream_.write (*mb, - mb->length ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1); + mb->length ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1); } return 0; } -void +void STDIN_Handler::handler (int signum) { ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum)); @@ -435,7 +435,7 @@ STDIN_Handler::open (void *) // Shut down. -int +int STDIN_Handler::close (u_long) { ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n")); @@ -444,38 +444,38 @@ STDIN_Handler::close (u_long) // Thread runs here. -int +int STDIN_Handler::svc (void) { this->register_thread_exit_hook (); - + for (;;) { ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); // Read from stdin into mb. - int read_result = ACE_OS::read (ACE_STDIN, - mb->rd_ptr (), - mb->size ()); + int read_result = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); // If read succeeds, put mb to peer handler, else end the loop. if (read_result > 0) - { - mb->wr_ptr (read_result); - // Note that this call will first enqueue mb onto the peer - // handler's message queue, which will then turn around and - // notify the Reactor via the Notification_Strategy. This - // will subsequently signal the Peer_Handler, which will - // react by calling back to its handle_output() method, - // which dequeues the message and sends it to the peer - // across the network. - this->ph_.putq (mb); - } - else - { - mb->release (); - break; - } + { + mb->wr_ptr (read_result); + // Note that this call will first enqueue mb onto the peer + // handler's message queue, which will then turn around and + // notify the Reactor via the Notification_Strategy. This + // will subsequently signal the Peer_Handler, which will + // react by calling back to its handle_output() method, + // which dequeues the message and sends it to the peer + // across the network. + this->ph_.putq (mb); + } + else + { + mb->release (); + break; + } } // handle_signal will get called on the main proactor thread since @@ -483,7 +483,7 @@ STDIN_Handler::svc (void) return 0; } -// Register an exit hook with the reactor. +// Register an exit hook with the reactor. void STDIN_Handler::register_thread_exit_hook (void) @@ -501,7 +501,7 @@ STDIN_Handler::register_thread_exit_hook (void) // The STDIN thread has exited. This means the user hit ^C. We can // end the event loop and delete ourself. -int +int STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) { ACE_ASSERT (this->thr_handle_ == si->si_handle_); @@ -525,34 +525,34 @@ main (int argc, char *argv[]) Peer_Handler peer_handler (argc, argv); if (peer_handler.open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p open failed, errno = %d.\n", - "peer_handler", errno), 0); + ACE_ERROR_RETURN ((LM_ERROR, + "%p open failed, errno = %d.\n", + "peer_handler", errno), 0); // Open active object for reading from stdin. STDIN_Handler stdin_handler (peer_handler); // Spawn thread. if (stdin_handler.open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p open failed, errno = %d.\n", - "stdin_handler", errno), 0); + ACE_ERROR_RETURN ((LM_ERROR, + "%p open failed, errno = %d.\n", + "stdin_handler", errno), 0); // Register proactor with Reactor so that we can demultiplex // "waitable" events and I/O operations from a single thread. - if (ACE_Reactor::instance ()->register_handler + if (ACE_Reactor::instance ()->register_handler (ACE_Proactor::instance ()->implementation ()) != 0) ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", - argv[0]), -1); + argv[0]), -1); // Run main event demultiplexor. ACE_Reactor::run_event_loop (); // Remove proactor with Reactor. - if (ACE_Reactor::instance ()->remove_handler + if (ACE_Reactor::instance ()->remove_handler (ACE_Proactor::instance (), ACE_Event_Handler::DONT_CALL) != 0) ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", - argv[0]), -1); + argv[0]), -1); return 0; } |