diff options
Diffstat (limited to 'examples/Reactor/WFMO_Reactor/test_reactorEx.cpp')
-rw-r--r-- | examples/Reactor/WFMO_Reactor/test_reactorEx.cpp | 428 |
1 files changed, 0 insertions, 428 deletions
diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp deleted file mode 100644 index 295b36ffda0..00000000000 --- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp +++ /dev/null @@ -1,428 +0,0 @@ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// examples -// -// = FILENAME -// test_reactorEx.cpp -// -// = DESCRIPTION -// This test application tests a wide range of events that can be -// demultiplexed using various ACE utilities. Events used include ^C -// events, reading from STDIN, vanilla Win32 events, thread exits, -// ReactorEx notifications, proactive reads, and proactive writes. -// -// The proactive I/O events are demultiplexed by the ACE_Proactor. -// The thread exits, notications, and vanilla Win32 events are -// demultiplexed by the ACE_ReactorEx. To enable a single thread -// to run all these events, the Proactor is integrated with the -// ReactorEx. -// -// = AUTHOR -// Tim Harrison -// -// ============================================================================ - -#include "ace/ReactorEx.h" -#include "ace/Proactor.h" -#include "ace/SOCK_Connector.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/Get_Opt.h" -#include "ace/Time_Value.h" -#include "ace/Service_Config.h" -#include "ace/Synch.h" -#include "ace/Task.h" - -typedef ACE_Task<ACE_MT_SYNCH> MT_TASK; - -class Peer_Handler : public MT_TASK - // = TITLE - // Connect to a server. Receive messages from STDIN_Handler - // and forward them to the server using proactive I/O. -{ -public: - // = Initialization methods. - Peer_Handler (int argc, char *argv[]); - ~Peer_Handler (void); - - int open (void * =0); - // This method creates the network connection to the remote peer. - // It does blocking connects and accepts depending on whether a - // hostname was specified from the command line. - - virtual int handle_output_complete (ACE_Message_Block *msg, - long bytes_transferred); - // One of our asynchronous writes to the remote peer has completed. - // Make sure it succeeded and then delete the message. - - virtual int handle_input_complete (ACE_Message_Block *msg, - long bytes_transferred); - // The remote peer has sent us something. If it succeeded, print - // out the message and reinitiate a read. Otherwise, fail. In both - // cases, delete the message sent. - - virtual ACE_Message_Block *get_message (void); - // This is so the Proactor can get a message to read into. - - virtual ACE_HANDLE get_handle (void) const; - // This is so the Proactor can get our handle. - - virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); - // We've been removed from the ReactorEx. - - virtual int handle_output (ACE_HANDLE fd); - // Called when output events should start. Note that this is - // automatically invoked by the - // <ACE_ReactorEx_Notificiation_Strategy>. - -private: - ACE_SOCK_Stream stream_; - // Socket that we have connected to the server. - - ACE_ReactorEx_Notification_Strategy strategy_; - // The strategy object that the reactorEx uses to notify us when - // something is added to the queue. - - // = Remote peer info. - char *host_; - // Name of remote host. - - u_short port_; - // Port number for remote host. -}; - -class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH> - // = TITLE - // Active Object. Reads from STDIN and passes message blocks to - // the peer handler. -{ -public: - STDIN_Handler (MT_TASK &ph); - // Initialization. - - virtual int open (void * = 0); - // Activate object. - - virtual int close (u_long = 0); - // Shut down. - - int svc (void); - // Thread runs here as an active object. - -private: - static void handler (int signum); - // Handle a ^C. (Do nothing, this just illustrates how we can catch - // signals along with the other things). - - void register_thread_exit_hook (void); - // Helper function to register with the ReactorEx for thread exit. - - virtual int handle_signal (int index, siginfo_t *, ucontext_t *); - // The STDIN thread has exited. This means the user hit ^C. We can - // end the event loop. - - MT_TASK &ph_; - // Send all input to ph_. - - ACE_HANDLE thr_handle_; - // Handle of our thread. -}; - -Peer_Handler::Peer_Handler (int argc, char *argv[]) - : host_ (0), - port_ (ACE_DEFAULT_SERVER_PORT), - strategy_ (ACE_Service_Config::reactorEx (), - this, - ACE_Event_Handler::WRITE_MASK) -{ - // This code sets up the message to notify us when a new message is - // added to the queue. Actually, the queue notifies ReactorEx which - // then notifies us. - this->msg_queue ()->notification_strategy (&this->strategy_); - - ACE_Get_Opt get_opt (argc, argv, "h:p:"); - int c; - - while ((c = get_opt ()) != EOF) - { - switch (c) - { - case 'h': - host_ = get_opt.optarg; - break; - case 'p': - port_ = ACE_OS::atoi (get_opt.optarg); - break; - } - } -} - -Peer_Handler::~Peer_Handler (void) -{ -} - -// This method creates the network connection to the remote peer. It -// does blocking connects and accepts depending on whether a hostname -// was specified from the command line. - -int -Peer_Handler::open (void *) -{ - if (host_ != 0) // Connector - { - ACE_INET_Addr addr (port_, host_); - ACE_SOCK_Connector connector; - - // Establish connection with server. - if (connector.connect (stream_, addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); - - ACE_DEBUG ((LM_DEBUG, "connected.\n")); - } - else // Acceptor - { - ACE_SOCK_Acceptor acceptor; - ACE_INET_Addr local_addr (port_); - - if ((acceptor.open (local_addr) == -1) || - (acceptor.accept (this->stream_) == -1)) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); - - ACE_DEBUG ((LM_DEBUG, "accepted.\n")); - } - - return ACE_Service_Config::proactor ()->initiate - (this, ACE_Event_Handler::READ_MASK); -} - -// One of our asynchronous writes to the remote peer has completed. -// Make sure it succeeded and then delete the message. - -int -Peer_Handler::handle_output_complete (ACE_Message_Block *msg, - long bytes_transferred) -{ - if (bytes_transferred <= 0) - ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed", - bytes_transferred)); - - // This was allocated by the STDIN_Handler, queued, dequeued, - // passed to the proactor, and now passed back to us. - delete msg; - return 0; // Do not reinvoke a send. -} - -// The remote peer has sent us something. If it succeeded, print -// out the message and reinitiate a read. Otherwise, fail. In both -// cases, delete the message sent. - -int -Peer_Handler::handle_input_complete (ACE_Message_Block *msg, - long bytes_transferred) -{ - if (bytes_transferred > 0 && msg->length () > 0) - { - msg->rd_ptr ()[bytes_transferred] = '\0'; - // Print out the message received from the server. - ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ())); - delete msg; - return 1; // Reinvokes the recv() operation! - } - - delete msg; - // If a read failed, we will assume it's because the remote peer - // went away. We will end the event loop. Since we're in the main - // thread, we don't need to do a notify. - ACE_Service_Config::end_reactorEx_event_loop (); - return -1; // Close down. -} - -// This is so the Proactor can get a message to read into. - -ACE_Message_Block * -Peer_Handler::get_message (void) -{ - // An extra byte for NUL termination. - ACE_Message_Block *message = - new ACE_Message_Block (BUFSIZ + 1); - - message->size (BUFSIZ); - return message; -} - -// This is so the Proactor can get our handle. -ACE_HANDLE -Peer_Handler::get_handle (void) const -{ - return this->stream_.get_handle (); -} - -// We've been removed from the ReactorEx. -int -Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) -{ - ACE_DEBUG ((LM_DEBUG, "Peer_Handler closing down\n")); - return 0; -} - -// New stuff added to the message queue. Try to dequeue a message. -int -Peer_Handler::handle_output (ACE_HANDLE fd) -{ - ACE_Message_Block *mb; - - ACE_Time_Value tv (ACE_Time_Value::zero); - - // Forward the message to the remote peer receiver. - if (this->getq (mb, &tv) != -1) - { - if (ACE_Service_Config::proactor ()-> - initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1) - ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler")); - } - return 0; -} - -void -STDIN_Handler::handler (int signum) -{ - ACE_DEBUG ((LM_DEBUG, "signal = %S\n", signum)); -} - -STDIN_Handler::STDIN_Handler (MT_TASK &ph) - : ph_ (ph) -{ - // Register for ^C from the console. We just need to catch the - // exception so that the kernel doesn't kill our process. - // Registering this signal handler just tells the kernel that we - // know what we're doing; to leave us alone. - - ACE_OS::signal (SIGINT, ACE_SignalHandler (STDIN_Handler::handler)); -}; - -// Activate object. - -int -STDIN_Handler::open (void *) -{ - if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1); - - return 0; -} - -// Shut down. - -int -STDIN_Handler::close (u_long) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n")); - return 0; -} - -// Thread runs here. - -int -STDIN_Handler::svc (void) -{ - this->register_thread_exit_hook (); - - for (;;) - { - ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); - - // Read from stdin into mb. - int read_result = ACE_OS::read (ACE_STDIN, - mb->rd_ptr (), - mb->size ()); - - // If read succeeds, put mb to peer handler, else end the loop. - if (read_result > 0) - { - mb->wr_ptr (read_result); - // Note that this call will first enqueue mb onto the peer - // handler's message queue, which will then turn around and - // notify the ReactorEx via the Notification_Strategy. This - // will subsequently signal the Peer_Handler, which will - // react by calling back to its handle_output() method, - // which dequeues the message and sends it to the peer - // across the network. - this->ph_.putq (mb); - } - else - break; - } - - // handle_signal will get called on the main proactor thread since - // we just exited and the main thread is waiting on our thread exit. - return 0; -} - -// Register an exit hook with the reactorEx. - -void -STDIN_Handler::register_thread_exit_hook (void) -{ - ACE_hthread_t handle; - - // Get a real handle to our thread. - ACE_Service_Config::thr_mgr ()->thr_self (handle); - - // Register ourselves to get called back when our thread exits. - - if (ACE_Service_Config::reactorEx ()-> - register_handler (this, handle) == -1) - ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n")); - - // We're in another thread, so we need to notify the ReactorEx so - // that it wakes up and waits on the new set of handles. - ACE_Service_Config::reactorEx ()->notify (); -} - -// The STDIN thread has exited. This means the user hit ^C. We can -// end the event loop and delete ourself. - -int -STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) -{ - ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n")); - ACE_ASSERT (this->thr_handle_ == si->si_handle_); - ACE_Service_Config::end_reactorEx_event_loop (); - return 0; -} - -int -main (int argc, char *argv[]) -{ - // Open handler for remote peer communications this will run from - // the main thread. - Peer_Handler peer_handler (argc, argv); - - if (peer_handler.open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p open failed, errno = %d.\n", - "peer_handler", errno), 0); - - // Open active object for reading from stdin. - STDIN_Handler stdin_handler (peer_handler); - - // Spawn thread. - if (stdin_handler.open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p open failed, errno = %d.\n", - "stdin_handler", errno), 0); - - // Register proactor with ReactorEx so that we can demultiplex - // "waitable" events and I/O operations from a single thread. - if (ACE_Service_Config::reactorEx ()->register_handler - (ACE_Service_Config::proactor ()) != 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", - argv[0]), -1); - - // Run main event demultiplexor. - ACE_Service_Config::run_reactorEx_event_loop (); - - return 0; -} |