diff options
Diffstat (limited to 'examples/Reactor/WFMO_Reactor/test_reactorEx.cpp')
-rw-r--r-- | examples/Reactor/WFMO_Reactor/test_reactorEx.cpp | 444 |
1 files changed, 0 insertions, 444 deletions
diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp deleted file mode 100644 index 62143956f9c..00000000000 --- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp +++ /dev/null @@ -1,444 +0,0 @@ -// ============================================================================ -// $Id$ - -// -// = LIBRARY -// examples -// -// = FILENAME -// test_reactorEx.cpp -// -// = DESCRIPTION -// This test application tests a wide range of events that can be -// demultiplexed using various ACE utilities. Events used include ^C -// events, reading from STDIN, vanilla Win32 events, thread exits, -// ReactorEx notifications, proactive reads, and proactive writes. -// -// The proactive I/O events are demultiplexed by the ACE_Proactor. -// The thread exits, notications, and vanilla Win32 events are -// demultiplexed by the ACE_ReactorEx. To enable a single thread -// to run all these events, the Proactor is integrated with the -// ReactorEx. -// -// = AUTHOR -// Tim Harrison -// -// ============================================================================ - -#include "ace/ReactorEx.h" -#include "ace/Proactor.h" -#include "ace/SOCK_Connector.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/Get_Opt.h" -#include "ace/Time_Value.h" -#include "ace/Service_Config.h" -#include "ace/Synch.h" -#include "ace/Task.h" -#include "ace/OS.h" - -typedef ACE_Task<ACE_MT_SYNCH> MT_TASK; - -class Peer_Handler : public MT_TASK - // = TITLE - // Connect to a server. Receive messages from STDIN_Handler - // and forward them to the server using proactive I/O. -{ -public: - Peer_Handler (int argc, char *argv[]); - - int open (void * =0); - // This method creates the network connection to the remote peer. - // It does blocking connects and accepts depending on whether a - // hostname was specified from the command line. - - virtual int handle_output_complete (ACE_Message_Block *msg, - long bytes_transfered); - // One of our asynchronous writes to the remote peer has completed. - // Make sure it succeeded and then delete the message. - - virtual int handle_input_complete (ACE_Message_Block *msg, - long bytes_transfered); - // The remote peer has sent us something. If it succeeded, print - // out the message and reinitiate a read. Otherwise, fail. In both - // cases, delete the message sent. - - virtual ACE_Message_Block *get_message (void); - // This is so the Proactor can get a message to read into. - - virtual ACE_HANDLE get_handle (void) const; - // This is so the Proactor can get our handle. - - virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); - // We've been removed from the ReactorEx. - - virtual int handle_signal (int index, siginfo_t *, ucontext_t *); - // We've been signaled by the STDIN thread. Try to dequeue a - // message. - - void try_send (void); - // Try to dequeue a message. If successful, send it proactively to - // the remote peer. - - virtual int put (ACE_Message_Block *mb, - ACE_Time_Value *tv = 0); - // Enqueue the new mb and signal the main thread. - -private: - ACE_SOCK_Stream stream_; - // Socket that we have connected to the server. - - ACE_HANDLE new_msg_event_; - // Event that gets signaled when messages arrive. - - // = Remote peer info. - char *host_; - // Name of remote host. - - u_short port_; - // Port number for remote host. - - // = Make Task happy. - int close (u_long) { return 0; } -}; - -class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH> - // = TITLE - // Active Object. Reads from STDIN and passes message blocks to - // the peer handler. -{ -public: - STDIN_Handler (MT_TASK &ph); - - virtual int open (void * = 0); - // Activate object. - - virtual int close (u_long = 0); - // Shut down. - - int svc (void); - // Thread runs here. - -private: - MT_TASK &ph_; - // Send all input to ph_. - - static void handler (int signum); - // Handle a ^C. (Do nothing). - - int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } - // Make Task happy. - - void register_thread_exit_hook (void); - // Helper function to register with the ReactorEx for thread exit. - - virtual int handle_signal (int index, siginfo_t *, ucontext_t *); - // The STDIN thread has exited. This means the user hit ^C. We can - // end the event loop. -}; - -Peer_Handler::Peer_Handler (int argc, char *argv[]) - : host_ (0), - port_ (ACE_DEFAULT_SERVER_PORT) -{ - ACE_Get_Opt get_opt (argc, argv, "h:p:"); - int c; - - while ((c = get_opt ()) != EOF) - { - switch (c) - { - case 'h': - host_ = get_opt.optarg; - break; - case 'p': - port_ = ACE_OS::atoi (get_opt.optarg); - break; - } - } - - // Auto reset event. - new_msg_event_ = ::CreateEvent (NULL, FALSE, FALSE, NULL); - ACE_Service_Config::reactorEx ()-> - register_handler (this, new_msg_event_); -} - -// This method creates the network connection to the remote peer. It -// does blocking connects and accepts depending on whether a hostname -// was specified from the command line. - -int -Peer_Handler::open (void *) -{ - if (host_ != 0) // Connector - { - ACE_INET_Addr addr (port_, host_); - ACE_SOCK_Connector connector; - - // Establish connection with server. - if (connector.connect (stream_, addr) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "connect"), -1); - - ACE_DEBUG ((LM_DEBUG, "connected.\n")); - } - else // Acceptor - { - ACE_SOCK_Acceptor acceptor; - ACE_INET_Addr local_addr (port_); - - if ((acceptor.open (local_addr) == -1) || - (acceptor.accept (this->stream_) == -1)) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept failed"), -1); - - ACE_DEBUG ((LM_DEBUG, "accepted.\n")); - } - - return ACE_Service_Config::proactor ()->initiate - (this, ACE_Event_Handler::READ_MASK); -} - -// One of our asynchronous writes to the remote peer has completed. -// Make sure it succeeded and then delete the message. - -int -Peer_Handler::handle_output_complete (ACE_Message_Block *msg, - long bytes_transfered) -{ - if (bytes_transfered <= 0) - ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed", - bytes_transfered)); - - // This was allocated by the STDIN_Handler, queued, dequeued, - // passed to the proactor, and now passed back to us. - delete msg; - return 0; // Do not reinvoke a send. -} - -// The remote peer has sent us something. If it succeeded, print -// out the message and reinitiate a read. Otherwise, fail. In both -// cases, delete the message sent. - -int -Peer_Handler::handle_input_complete (ACE_Message_Block *msg, - long bytes_transfered) -{ - if ((bytes_transfered > 0) && (msg->length () > 0)) - { - msg->rd_ptr ()[bytes_transfered] = '\0'; - // Print out the message received from the server. - ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ())); - delete msg; - return 1; // Reinvokes the recv() operation! - } - - delete msg; - // If a read failed, we will assume it's because the remote peer - // went away. We will end the event loop. Since we're in the main - // thread, we don't need to do a notify. - ACE_Service_Config::end_reactorEx_event_loop (); - return -1; // Close down. -} - -// This is so the Proactor can get a message to read into. - -ACE_Message_Block * -Peer_Handler::get_message (void) -{ - // An extra byte for NUL termination. - ACE_Message_Block *message = - new ACE_Message_Block (BUFSIZ + 1); - - message->size (BUFSIZ); - return message; -} - -// This is so the Proactor can get our handle. -ACE_HANDLE -Peer_Handler::get_handle (void) const -{ - return this->stream_.get_handle (); -} - -// We've been removed from the ReactorEx. -int -Peer_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) -{ - ACE_DEBUG ((LM_DEBUG, "Peer_Handler closing down\n")); - return 0; -} - -// We've been signaled by the STDIN thread. Try to dequeue a -// message. - -int -Peer_Handler::handle_signal (int, siginfo_t *, ucontext_t *) -{ - this->try_send (); - return 0; -} - -// Try to dequeue a message. If successful, send it proactively to -// the remote peer. - -void -Peer_Handler::try_send (void) -{ - ACE_Message_Block *mb; - - ACE_Time_Value tv (ACE_Time_Value::zero); - - if (this->getq (mb, &tv) != -1) - { - if (ACE_Service_Config::proactor ()-> - initiate (this, ACE_Event_Handler::WRITE_MASK, mb) == -1) - ACE_ERROR ((LM_ERROR, "%p Write initiate.\n", "Peer_Handler")); - } -} - -// Enqueue the new mb and signal the main thread. - -int -Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv) -{ - // Enqueue the mb. - int result = this->putq (mb, tv); - - // Signal the main thread. This will remain signaled until the main - // thread wakes up. - - if (::SetEvent (new_msg_event_) == 0) - ACE_ERROR ((LM_ERROR, "Pulse Failed!\n")); - - return result; -} - -void -STDIN_Handler::handler (int signum) -{ - ACE_DEBUG ((LM_DEBUG, "signal = %S\n", signum)); -} - -STDIN_Handler::STDIN_Handler (MT_TASK &ph) - : ph_ (ph) -{ - // Register for ^C from the console. We just need to catch the - // exception so that the kernel doesn't kill our process. - // Registering this signal handler just tells the kernel that we - // know what we're doing; to leave us alone. - ACE_OS::signal (SIGINT, ACE_SignalHandler (STDIN_Handler::handler)); -}; - -// Activate object. - -int -STDIN_Handler::open (void *) -{ - if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1); - - return 0; -} - -// Shut down. - -int -STDIN_Handler::close (u_long) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting.\n")); - return 0; -} - -// Thread runs here. - -int -STDIN_Handler::svc (void) -{ - this->register_thread_exit_hook (); - - for (;;) - { - ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); - - // Read from stdin into mb. - int read_result = ACE_OS::read (ACE_STDIN, - mb->rd_ptr (), - mb->size ()); - - // If read succeeds, put mb to peer handler, else end the loop. - if (read_result > 0) - { - mb->wr_ptr (read_result); - this->ph_.put (mb); - } - else - break; - } - - // handle_signal will get called. - return 42; -} - -// Register an exit hook with the reactorEx. All this junk is testing -// out how ACE_Thread::self (hthread_id&) doesn't work!! - -void -STDIN_Handler::register_thread_exit_hook (void) -{ - ACE_hthread_t handle; - - // Get a real handle to our thread. - ACE_Service_Config::thr_mgr ()->thr_self (handle); - - // Register ourselves to get called back when our thread exits. - - if (ACE_Service_Config::reactorEx ()-> - register_handler (this, handle) == -1) - ACE_ERROR ((LM_ERROR, "Exit_Hook Register failed.\n")); - - // We're in another thread, so we need to notify the ReactorEx so - // that it wakes up and waits on the new set of handles. - ACE_Service_Config::reactorEx ()->notify (); -} - -// The STDIN thread has exited. This means the user hit ^C. We can -// end the event loop and delete ourself. - -int -STDIN_Handler::handle_signal (int, siginfo_t *, ucontext_t *) -{ - ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n")); - ACE_Service_Config::end_reactorEx_event_loop (); - return 0; -} - -int -main (int argc, char *argv[]) -{ - // Open handler for remote peer communications this will run from - // the main thread. - Peer_Handler peer_handler (argc, argv); - - if (peer_handler.open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p open failed, errno = %d.\n", - "peer_handler", errno), 0); - - // Open active object for reading from stdin. - STDIN_Handler stdin_handler (peer_handler); - - if (stdin_handler.open () == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p open failed, errno = %d.\n", - "stdin_handler", errno), 0); - - // Register proactor with ReactorEx so that we can demultiplex - // "waitable" events and I/O operations from a single thread. - if (ACE_Service_Config::reactorEx ()->register_handler - (ACE_Service_Config::proactor ()) != 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p failed to register Proactor.\n", - argv[0]), -1); - - // Run main event demultiplexor. - ACE_Service_Config::run_reactorEx_event_loop (); - - return 42; -} |