diff options
Diffstat (limited to 'ACE/examples/Reactor/Proactor/test_cancel.cpp')
-rw-r--r-- | ACE/examples/Reactor/Proactor/test_cancel.cpp | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Proactor/test_cancel.cpp b/ACE/examples/Reactor/Proactor/test_cancel.cpp new file mode 100644 index 00000000000..49c0c0a6490 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_cancel.cpp @@ -0,0 +1,246 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_cancel.cpp +// +// = DESCRIPTION +// This program tests cancelling an Asynchronous Operation in the +// Proactor framework. +// +// This tests accepts a connection and issues an Asynchronous Read +// Stream. It reads <read_size> (option -s) number of bytes and +// when this operation completes, it issues another Asynchronous +// Read Stream to <read_size> and immediately calls <cancel> to +// cancel the operation and so the program exits closing the +// connection. +// +// Works fine on NT. On Solaris platforms, the asynch read is +// pending, but the cancel returns with the value <AIO_ALLDONE> +// indicating all the operations in that handle are done. +// But, LynxOS has a good <aio_cancel> implementation. It works +// fine. +// +// = RUN +// ./test_cancel -p <port_number> +// Then telnet to this port and send <read_size> bytes and your +// connection should get closed down. +// +// = AUTHOR +// Irfan Pyarali (irfan@cs.wustl.edu) +// +// ============================================================================ + +#include "ace/OS_main.h" +#include "ace/Service_Config.h" +#include "ace/Proactor.h" +#include "ace/Asynch_IO.h" +#include "ace/Asynch_IO_Impl.h" +#include "ace/Asynch_Acceptor.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "ace/Message_Block.h" +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_sys_socket.h" + +ACE_RCSID (Proactor, test_proactor, "$Id$") + +#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) + // This only works on Win32 platforms and on Unix platforms supporting + // POSIX aio calls. + +#include "test_cancel.h" + +static u_short port = ACE_DEFAULT_SERVER_PORT; +static int done = 0; +static int read_size = 2; + + +Receiver::Receiver (void) + : mb_ (read_size + 1), + handle_ (ACE_INVALID_HANDLE) +{ +} + +Receiver::~Receiver (void) +{ + ACE_DEBUG ((LM_DEBUG, + "Receiver: Closing down Remote connection:%d\n", + this->handle_)); + + ACE_OS::closesocket (this->handle_); +} + +void +Receiver::open (ACE_HANDLE handle, + ACE_Message_Block &) +{ + // New connection, initiate stuff + + ACE_DEBUG ((LM_DEBUG, "%N:%l:Receiver::open called\n")); + + // Cache the new connection + this->handle_ = handle; + + // Initiate ACE_Asynch_Read_Stream + if (this->rs_.open (*this, this->handle_) == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::open")); + return; + } + + // Try to read <n> bytes from the stream. + + ACE_DEBUG ((LM_DEBUG, + "Receiver::open: Issuing Asynch Read of (%d) bytes from the stream\n", + read_size)); + + if (this->rs_.read (this->mb_, + read_size) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "Receiver::open: Failed to issue the read")); +} + +void +Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); + + // Reset pointers + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); + + if (result.success () && !result.error ()) + { + // Successful read: No error. + + // Set the pointers back in the message block. + result.message_block ().wr_ptr (result.message_block ().rd_ptr ()); + + // Issue another read, but immediately cancel it. + + // Issue the read. + + ACE_DEBUG ((LM_DEBUG, + "Issuing Asynch Read of (%d) bytes from the stream\n", + read_size)); + + if (this->rs_.read (this->mb_, + read_size) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "Receiver::handle_read_stream: Failed to issue the read")); + + // Cancel the read. + + ACE_DEBUG ((LM_DEBUG, + "Cacelling Asynch Read ")); + + int ret_val = this->rs_.cancel (); + if (ret_val == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "Receiver::handle_read_stream: Failed to cancel the read")); + + ACE_DEBUG ((LM_DEBUG, "Asynch IO : Cancel : Result = %d\n", + ret_val)); + } + else + { + done = 1; + + ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); + + // Print the error message if any. + if (result.error () != 0) + { + errno = result.error (); + + ACE_ERROR ((LM_ERROR, + "%p\n", + "Asynch Read Stream Error: ")); + } + } +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("p:s:")); + int c; + + while ((c = get_opt ()) != EOF) + switch (c) + { + case 'p': + port = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 's': + read_size = ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + ACE_ERROR ((LM_ERROR, "%p.\n", + "usage :\n" + "-p <port>\n" + "-s <read_size>\n")); + return -1; + } + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + if (parse_args (argc, argv) == -1) + return -1; + + // Note: acceptor parameterized by the Receiver + ACE_Asynch_Acceptor<Receiver> acceptor; + + // Listening passively. + if (acceptor.open (ACE_INET_Addr (port), + read_size, + 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "ACE:acceptor::open failed\n"), + 1); + + int success = 1; + + while (success > 0 && !done) + // dispatch events + success = ACE_Proactor::instance ()->handle_events (); + + return 0; +} + +#else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */ + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "This example does not work on this platform.\n")); + return 1; +} + +#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */ |