diff options
Diffstat (limited to 'ACE/examples/Reactor/WFMO_Reactor/Talker.cpp')
-rw-r--r-- | ACE/examples/Reactor/WFMO_Reactor/Talker.cpp | 610 |
1 files changed, 610 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/WFMO_Reactor/Talker.cpp b/ACE/examples/Reactor/WFMO_Reactor/Talker.cpp new file mode 100644 index 00000000000..ff6ec9ff0c9 --- /dev/null +++ b/ACE/examples/Reactor/WFMO_Reactor/Talker.cpp @@ -0,0 +1,610 @@ + +//============================================================================= +/** + * @file Talker.cpp + * + * $Id$ + * + * + * This test application tests a wide range of events that can be + * demultiplexed using various ACE utilities. Events used include + * ^C events, reading from STDIN, vanilla Win32 events, thread + * exits, Reactor notifications, proactive reads, and proactive + * writes. + * + * The proactive I/O events are demultiplexed by the ACE_Proactor. + * The thread exits, notications, and vanilla Win32 events are + * demultiplexed by the ACE_Reactor. To enable a single thread + * to run all these events, the Proactor is integrated with the + * Reactor. + * + * The test application prototypes a simple talk program. Two + * instances of the application connect. Input from either console + * is displayed on the others console also. Because of the evils + * of Win32 STDIN, a separate thread is used to read from STDIN. + * To test the Proactor and Reactor, I/O between the remote + * processes is performed proactively and interactions between the + * STDIN thread and the main thread are performed reactively. + * + * The following description of the test application is in two + * parts. The participants section explains the main components + * involved in the application. The collaboration section + * describes how the partipants interact in response to the + * multiple event types which occur. + * + * The Reactor test application has the following participants: + * + * . Reactor -- The Reactor demultiplexes Win32 "waitable" + * events using WaitForMultipleObjects. + * + * . Proactor -- The proactor initiates and demultiplexes + * overlapped I/O operations. The Proactor registers with the + * Reactor so that a single-thread can demultiplex all + * application events. + * + * . STDIN_Handler -- STDIN_Handler is an Active Object which reads + * from STDIN and forwards the input to the Peer_Handler. This + * runs in a separate thread to make the test more interesting. + * However, STDIN is "waitable", so in general it can be waited on + * by the ACE Reactor, thanks MicroSlush! + * + * . Peer_Handler -- The Peer_Handler connects to another instance + * of test_reactor. It Proactively reads and writes data to the + * peer. When the STDIN_Handler gives it messages, it fowards them + * to the remote peer. When it receives messages from the remote + * peer, it prints the output to the console. + * + * The collaborations of the participants are as follows: + * + * . Initialization + * + * Peer_Handler -- connects to the remote peer. It then begins + * proactively reading from the remote connection. Note that it + * will be notified by the Proactor when a read completes. It + * also registers a notification strategy with message queue so + * that it is notified when the STDIN_Handler posts a message + * onto the queue. + * + * STDIN_Handler -- STDIN_Handler registers a signal handler for + * SIGINT. This just captures the exception so that the kernel + * doesn't kill our process; We want to exit gracefully. It also + * creates an Exit_Hook object which registers the + * STDIN_Handler's thread handle with the Reactor. The + * Exit_Hook will get called back when the STDIN_Handler thread + * exits. After registering these, it blocks reading from STDIN. + * + * Proactor -- is registered with the Reactor. + * + * The main thread of control waits in the Reactor. + * + * . STDIN events -- When the STDIN_Handler thread reads from + * STDIN, it puts the message on Peer_Handler's message queue. It + * then returns to reading from STDIN. + * + * . Message enqueue -- The Reactor thread wakes up and calls + * Peer_Handler::handle_output. The Peer_Handler then tries to + * dequeue a message from its message queue. If it can, the + * message is Proactively sent to the remote peer. Note that the + * Peer_Handler will be notified with this operation is complete. + * The Peer_Handler then falls back into the Reactor event loop. + * + * . Send complete event -- When a proactive send is complete, the + * Proactor is notified by the Reactor. The Proactor, in turn, + * notifies the Peer_Handler. The Peer_Handler then checks for + * more messages from the message queue. If there are any, it + * tries to send them. If there are not, it returns to the + * Reactor event loop. + * + * . Read complete event -- When a proactive read is complete (the + * Peer_Handler initiated a proactive read when it connected to the + * remote peer), the Proactor is notified by the Reactor. The + * Proactor, in turn notifies the Peer_Handler. If the read was + * successful the Peer_Handler just displays the received msg to + * the console and reinvokes a proactive read from the network + * connection. If the read failed (i.e. the remote peer exited), + * the Peer_Handler sets a flag to end the event loop and returns. + * This will cause the application to exit. + * + * . ^C events -- When the user types ^C at the console, the + * STDIN_Handler's signal handler will be called. It does nothing, + * but as a result of the signal, the STDIN_Handler thread will + * exit. + * + * . STDIN_Handler thread exits -- The Exit_Hook will get called + * back from the Reactor. Exit_Hook::handle_signal sets a flag + * to end the event loop and returns. This will cause the + * application to exit. + * + * + * To run example, start an instance of the test with an optional + * local port argument (as the acceptor). Start the other instance + * with -h <hostname> and -p <server port>. Type in either the + * client or server windows and your message should show up in the + * other window. Control C to exit. + * + * + * @author Tim Harrison Irfan Pyarali + */ +//============================================================================= + + +#include "ace/OS_main.h" + +#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) + +#include "ace/Reactor.h" +#include "ace/Reactor_Notification_Strategy.h" +#include "ace/WIN32_Proactor.h" +#include "ace/Proactor.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Get_Opt.h" +#include "ace/Service_Config.h" +#include "ace/Task.h" +#include "ace/OS_NS_unistd.h" + + + +typedef ACE_Task<ACE_MT_SYNCH> MT_TASK; + +/** + * @class Peer_Handler + * + * @brief Connect to a server. Receive messages from STDIN_Handler + * and forward them to the server using proactive I/O. + */ +class Peer_Handler : public MT_TASK, public ACE_Handler +{ +public: + // = Initialization methods. + Peer_Handler (int argc, ACE_TCHAR *argv[]); + ~Peer_Handler (void); + + //FUZZ: disable check_for_lack_ACE_OS + /** + * This method creates the network connection to the remote peer. + * It does blocking connects and accepts depending on whether a + * hostname was specified from the command line. + *FUZZ: enable check_for_lack_ACE_OS + */ + int open (void * =0); + + /** + * This method will be called when an asynchronous read completes on a stream. + * The remote peer has sent us something. If it succeeded, print + * out the message and reinitiate a read. Otherwise, fail. In both + * cases, delete the message sent. + */ + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + + /** + * This method will be called when an asynchronous write completes on a strea_m. + * One of our asynchronous writes to the remote peer has completed. + * Make sure it succeeded and then delete the message. + */ + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + + /** + * Get the I/O handle used by this <handler>. This method will be + * called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is + * passed to <open>. + */ + virtual ACE_HANDLE handle (void) const; + + /// Set the ACE_HANDLE value for this Handler. + void handle (ACE_HANDLE); + + /// We've been removed from the Reactor. + virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); + + /** + * Called when output events should start. Note that this is + * automatically invoked by the + * <ACE_Reactor_Notificiation_Strategy>. + */ + virtual int handle_output (ACE_HANDLE fd); + +private: + /// Socket that we have connected to the server. + ACE_SOCK_Stream stream_; + + /// The strategy object that the reactor uses to notify us when + /// something is added to the queue. + ACE_Reactor_Notification_Strategy strategy_; + + // = Remote peer info. + /// Name of remote host. + ACE_TCHAR *host_; + + /// Port number for remote host. + u_short port_; + + /// Read stream + ACE_Asynch_Read_Stream rd_stream_; + + /// Write stream + ACE_Asynch_Write_Stream wr_stream_; + + /// Message Block for reading from the network + ACE_Message_Block mb_; +}; + +/** + * @class STDIN_Handler + * + * @brief Active Object. Reads from STDIN and passes message blocks to + * the peer handler. + */ +class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH> +{ +public: + /// Initialization. + STDIN_Handler (MT_TASK &ph); + + //FUZZ: disable check_for_lack_ACE_OS + /// Activate object. + virtual int open (void * = 0); + + /// Shut down. + ///FUZZ: enable check_for_lack_ACE_OS + virtual int close (u_long = 0); + + /// Thread runs here as an active object. + int svc (void); + + int handle_close (ACE_HANDLE, + ACE_Reactor_Mask); + +private: + /// Handle a ^C. (Do nothing, this just illustrates how we can catch + /// signals along with the other things). + static void handler (int signum); + + /// Helper function to register with the Reactor for thread exit. + void register_thread_exit_hook (void); + + /// The STDIN thread has exited. This means the user hit ^C. We can + /// end the event loop. + virtual int handle_signal (int index, siginfo_t *, ucontext_t *); + + /// Send all input to ph_. + MT_TASK &ph_; + + /// Handle of our thread. + ACE_HANDLE thr_handle_; +}; + +Peer_Handler::Peer_Handler (int argc, ACE_TCHAR *argv[]) + : strategy_ (ACE_Reactor::instance (), + this, + ACE_Event_Handler::WRITE_MASK), + host_ (0), + port_ (ACE_DEFAULT_SERVER_PORT), + mb_ (BUFSIZ) +{ + // This code sets up the message to notify us when a new message is + // added to the queue. Actually, the queue notifies Reactor which + // then notifies us. + this->msg_queue ()->notification_strategy (&this->strategy_); + + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("h:p:")); + int c; + + while ((c = get_opt ()) != EOF) + { + switch (c) + { + case 'h': + host_ = get_opt.opt_arg (); + break; + case 'p': + port_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + } + } +} + +Peer_Handler::~Peer_Handler (void) +{ +} + +// This method creates the network connection to the remote peer. It +// does blocking connects and accepts depending on whether a hostname +// was specified from the command line. + +int +Peer_Handler::open (void *) +{ + if (host_ != 0) // Connector + { + ACE_INET_Addr addr (port_, host_); + ACE_SOCK_Connector connector; + + // Establish connection with server. + if (connector.connect (stream_, addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); + + ACE_DEBUG ((LM_DEBUG, "(%t) connected.\n")); + } + else // Acceptor + { + ACE_SOCK_Acceptor acceptor; + ACE_INET_Addr local_addr (port_); + + if ((acceptor.open (local_addr) == -1) || + (acceptor.accept (this->stream_) == -1)) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); + + ACE_DEBUG ((LM_DEBUG, "(%t) accepted.\n")); + } + + int result = this->rd_stream_.open (*this); + if (result != 0) + return result; + + result = this->wr_stream_.open (*this); + if (result != 0) + return result; + + result = this->rd_stream_.read (this->mb_, + this->mb_.size ()); + return result; +} + +// One of our asynchronous writes to the remote peer has completed. +// Make sure it succeeded and then delete the message. + +void +Peer_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + if (result.bytes_transferred () <= 0) + ACE_DEBUG ((LM_DEBUG, "(%t) %p bytes = %d\n", "Message failed", + result.bytes_transferred ())); + + // This was allocated by the STDIN_Handler, queued, dequeued, passed + // to the proactor, and now passed back to us. + result.message_block ().release (); +} + +// The remote peer has sent us something. If it succeeded, print +// out the message and reinitiate a read. Otherwise, fail. In both +// cases, delete the message sent. + + +void +Peer_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + if (result.bytes_transferred () > 0 && + this->mb_.length () > 0) + { + this->mb_.rd_ptr ()[result.bytes_transferred ()] = '\0'; + // Print out the message received from the server. + ACE_DEBUG ((LM_DEBUG, "%s", this->mb_.rd_ptr ())); + } + else + { + // If a read failed, we will assume it's because the remote peer + // went away. We will end the event loop. Since we're in the + // main thread, we don't need to do a notify. + ACE_Reactor::end_event_loop(); + return; + } + + // Reset pointers + this->mb_.wr_ptr (this->mb_.wr_ptr () - result.bytes_transferred ()); + + // Start off another read + if (this->rd_stream_.read (this->mb_, + this->mb_.size ()) == -1) + ACE_ERROR ((LM_ERROR, "%p Read initiate.\n", "Peer_Handler")); +} + +// This is so the Proactor can get our handle. +ACE_HANDLE +Peer_Handler::handle (void) const +{ + return this->stream_.get_handle (); +} + +void +Peer_Handler::handle (ACE_HANDLE handle) +{ + this->stream_.set_handle (handle); +} + +// We've been removed from the Reactor. +int +Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler closing down\n")); + return 0; +} + +// New stuff added to the message queue. Try to dequeue a message. +int +Peer_Handler::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *mb = 0; + + ACE_Time_Value tv (ACE_Time_Value::zero); + + // Forward the message to the remote peer receiver. + if (this->getq (mb, &tv) != -1) + { + if (this->wr_stream_.write (*mb, + mb->length ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler"), -1); + } + return 0; +} + +void +STDIN_Handler::handler (int signum) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) signal = %S\n", signum)); +} + +STDIN_Handler::STDIN_Handler (MT_TASK &ph) + : ph_ (ph) +{ + // Register for ^C from the console. We just need to catch the + // exception so that the kernel doesn't kill our process. + // Registering this signal handler just tells the kernel that we + // know what we're doing; to leave us alone. + + ACE_OS::signal (SIGINT, (ACE_SignalHandler) STDIN_Handler::handler); +}; + +// Activate object. + +int +STDIN_Handler::open (void *) +{ + if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1); + + return 0; +} + +// Shut down. + +int +STDIN_Handler::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n")); + return 0; +} + +// Thread runs here. + +int +STDIN_Handler::svc (void) +{ + this->register_thread_exit_hook (); + + for (;;) + { + ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); + + // Read from stdin into mb. + int read_result = ACE_OS::read (ACE_STDIN, + mb->rd_ptr (), + mb->size ()); + + // If read succeeds, put mb to peer handler, else end the loop. + if (read_result > 0) + { + mb->wr_ptr (read_result); + // Note that this call will first enqueue mb onto the peer + // handler's message queue, which will then turn around and + // notify the Reactor via the Notification_Strategy. This + // will subsequently signal the Peer_Handler, which will + // react by calling back to its handle_output() method, + // which dequeues the message and sends it to the peer + // across the network. + this->ph_.putq (mb); + } + else + { + mb->release (); + break; + } + } + + // handle_signal will get called on the main proactor thread since + // we just exited and the main thread is waiting on our thread exit. + return 0; +} + +// Register an exit hook with the reactor. + +void +STDIN_Handler::register_thread_exit_hook (void) +{ + // Get a real handle to our thread. + ACE_Thread_Manager::instance ()->thr_self (this->thr_handle_); + + // Register ourselves to get called back when our thread exits. + + if (ACE_Reactor::instance ()-> + register_handler (this, this->thr_handle_) == -1) + ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n")); +} + +// The STDIN thread has exited. This means the user hit ^C. We can +// end the event loop and delete ourself. + +int +STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) +{ + if (si != 0) + { + ACE_TEST_ASSERT (this->thr_handle_ == si->si_handle_); + ACE_Reactor::end_event_loop (); + } + return 0; +} + +int +STDIN_Handler::handle_close (ACE_HANDLE, + ACE_Reactor_Mask) +{ + delete this; + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + // Let the proactor know that it will be used with Reactor + // Create specific proactor + ACE_WIN32_Proactor win32_proactor (0, 1); + // Get the interface proactor + ACE_Proactor proactor (&win32_proactor); + // Put it as the instance. + ACE_Proactor::instance (&proactor); + + // Open handler for remote peer communications this will run from + // the main thread. + Peer_Handler peer_handler (argc, argv); + + if (peer_handler.open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p open failed, errno = %d.\n", + "peer_handler", errno), 0); + + // Open active object for reading from stdin. + STDIN_Handler *stdin_handler = + new STDIN_Handler (peer_handler); + + // Spawn thread. + if (stdin_handler->open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p open failed, errno = %d.\n", + "stdin_handler", errno), 0); + + // Register proactor with Reactor so that we can demultiplex + // "waitable" events and I/O operations from a single thread. + if (ACE_Reactor::instance ()->register_handler + (ACE_Proactor::instance ()->implementation ()) != 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", + argv[0]), -1); + + // Run main event demultiplexor. + ACE_Reactor::run_event_loop (); + + // Remove proactor with Reactor. + if (ACE_Reactor::instance ()->remove_handler + (ACE_Proactor::instance ()->implementation (), ACE_Event_Handler::DONT_CALL) != 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", + argv[0]), -1); + + return 0; +} +#else /* !ACE_HAS_WIN32_OVERLAPPED_IO */ +int +ACE_TMAIN (int , ACE_TCHAR *[]) +{ + return 0; +} +#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ |