diff options
author | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-08-01 16:43:24 +0000 |
---|---|---|
committer | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-08-01 16:43:24 +0000 |
commit | b462a7a9d307c5f5eff30a24008045c0e3d0185a (patch) | |
tree | a9efcb39b9e041e5cf70e17090c76df8b5354b47 /examples/Reactor/Proactor/test_proactor2.cpp | |
parent | 7e1358bc65c1d08e3ba99af5b7e5869e1cd2c47f (diff) | |
download | ATCD-TAO-1_1_7.tar.gz |
This commit was manufactured by cvs2svn to create tag 'TAO-1_1_7'.TAO-1_1_7
Diffstat (limited to 'examples/Reactor/Proactor/test_proactor2.cpp')
-rw-r--r-- | examples/Reactor/Proactor/test_proactor2.cpp | 806 |
1 files changed, 0 insertions, 806 deletions
diff --git a/examples/Reactor/Proactor/test_proactor2.cpp b/examples/Reactor/Proactor/test_proactor2.cpp deleted file mode 100644 index 1f972d1c018..00000000000 --- a/examples/Reactor/Proactor/test_proactor2.cpp +++ /dev/null @@ -1,806 +0,0 @@ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// examples -// -// = FILENAME -// test_proactor2.cpp -// -// = DESCRIPTION -// Alexander Libman <Alibman@baltimore.com> modified -// <test_proactor> and made this test. Instead of writing received -// data to the file, the receiver sends them back to the -// sender,i.e. ACE_Asynch_Write_File wf_ has been changed to -// ACE_Asynch_Write_Stream wf_. -// -// = AUTHOR -// Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman -// <Alibman@baltimore.com>. -// ============================================================================ - -#include "ace/Signal.h" - -#include "ace/Service_Config.h" -#include "ace/Proactor.h" -#include "ace/Asynch_IO.h" -#include "ace/Asynch_IO_Impl.h" -#include "ace/Asynch_Acceptor.h" -#include "ace/INET_Addr.h" -#include "ace/SOCK_Connector.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/SOCK_Stream.h" -#include "ace/Message_Block.h" -#include "ace/Get_Opt.h" -#include "ace/streams.h" - -#include "ace/Task.h" - - -ACE_RCSID(Proactor, test_proactor2, "test_proactor2.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp") - -#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) - // This only works on Win32 platforms and on Unix platforms supporting - // POSIX aio calls. - -#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) - -#include "ace/WIN32_Proactor.h" - -#elif defined (ACE_HAS_AIO_CALLS) - -#include "ace/POSIX_Proactor.h" - -#endif - - // Some debug helper functions - int DisableSignal ( int SigNum ); -int PrintSigMask (); - -#define COUT(X) cout << X ; cout.flush (); - -// Host that we're connecting to. -static char *host = 0; - -// duplex mode: ==0 half-duplex -// !=0 full duplex -static int duplex = 0 ; - -// number threads in the Proactor thread pool -static int nThreads = 1; - -// Port that we're receiving connections on. -static u_short port = ACE_DEFAULT_SERVER_PORT; - -// Size of each initial asynchronous <read> operation. -static int initial_read_size = BUFSIZ; - - -#define MyMutex ACE_Recursive_Thread_Mutex -//#define MyMutex ACE_Thread_Mutex -//#define MyMutex ACE_Null_Mutex - -//-------------------------------------------------------------------------- -// MyTask plays role for Proactor threads pool -//-------------------------------------------------------------------------- -class MyTask: public ACE_Task<ACE_MT_SYNCH> -{ - -public: - - int svc (void) ; -}; - - -int MyTask::svc (void ) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n")); - - while ( ACE_Proactor::event_loop_done () == 0 ) - { - ACE_Proactor::run_event_loop (); - } - - ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n")); - return 0 ; -} - -//----------------------------------------------------------- -// Receiver -//----------------------------------------------------------- -class Receiver : public ACE_Service_Handler -{ -public: - - Receiver (void); - ~Receiver (void); - - virtual void open (ACE_HANDLE handle, - ACE_Message_Block &message_block); - // This is called after the new connection has been accepted. - -protected: - // These methods are called by the framework - - virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result - &result); - // This is called when asynchronous <read> operation from the socket - // complete. - - virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result - &result); - // This is called when an asynchronous <write> to the file - // completes. - -private: - int initiate_read_stream (void); - int initiate_write_stream (ACE_Message_Block & mb, int nBytes ); - bool check_destroy () ; - - ACE_Asynch_Read_Stream rs_; - ACE_Asynch_Write_Stream ws_; - ACE_HANDLE handle_; - MyMutex m_Mtx ; - long nIOCount ; - static long nSessions ; -}; - - -long Receiver::nSessions = 0 ; - -Receiver::Receiver (void) - : handle_ (ACE_INVALID_HANDLE), - nIOCount ( 0 ) -{ - ACE_Guard<MyMutex> locker (m_Mtx) ; - nSessions ++ ; - ACE_DEBUG ((LM_DEBUG, "Receiver Ctor nSessions=%d\n", nSessions )); -} - -Receiver::~Receiver (void) -{ - ACE_Guard<MyMutex> locker (m_Mtx) ; - nSessions -- ; - ACE_OS::closesocket (this->handle_); - ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor nSessions=%d\n", nSessions )); -} - -//--------------------------------------------------------------------- -// return true if we alive, false we commited suicide -// -//--------------------------------------------------------------------- -bool Receiver::check_destroy () -{ - { - ACE_Guard<MyMutex> locker (m_Mtx) ; - - if ( nIOCount > 0 ) - { - return true ; - } - } - - delete this ; - return false ; -} - - -void Receiver::open (ACE_HANDLE handle, - ACE_Message_Block &message_block) -{ - ACE_UNUSED_ARG (message_block); - - ACE_DEBUG ((LM_DEBUG, - "%N:%l:Receiver::open called\n")); - - - this->handle_ = handle; - - if (this->ws_.open (*this, this->handle_ ) == -1) - { - ACE_ERROR ((LM_ERROR, - "%p\n", - "ACE_Asynch_Write_Stream::open")); - - } - else if (this->rs_.open (*this, this->handle_) == -1) - { - ACE_ERROR ((LM_ERROR, - "%p\n", - "ACE_Asynch_Read_Stream::open")); - } - else - { - initiate_read_stream (); - } - - - check_destroy (); -} - -int Receiver::initiate_read_stream (void) -{ - ACE_Guard<MyMutex> locker (m_Mtx) ; - - // Create a new <Message_Block>. Note that this message block will - // be used both to <read> data asynchronously from the socket and to - // <write> data asynchronously to the file. - ACE_DEBUG ((LM_DEBUG, - "initiate_read_stream called\n")); - - - ACE_Message_Block *mb = 0; - ACE_NEW_RETURN (mb, - ACE_Message_Block (BUFSIZ + 1), - -1); - - // Inititiate read - if (this->rs_.read (*mb, mb->size ()- 1) == -1) - { - mb->release () ; - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "ACE_Asynch_Read_Stream::read"), - -1); - } - - nIOCount++ ; - return 0; -} - -int Receiver::initiate_write_stream (ACE_Message_Block & mb, int nBytes ) -{ - ACE_Guard<MyMutex> locker (m_Mtx) ; - if (this->ws_.write (mb , nBytes ) == -1) - { - mb.release (); - ACE_ERROR_RETURN((LM_ERROR, - "%p\n", - "ACE_Asynch_Write_File::write"), - -1); - } - - nIOCount++ ; - return 0; -} - -void -Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) -{ - ACE_DEBUG ((LM_DEBUG, - "handle_read_stream called\n")); - - // Reset pointers. - result.message_block ().rd_ptr ()[result.bytes_transferred ()] = - '\0'; - - ACE_DEBUG ((LM_DEBUG, "********************\n")); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read - ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", - result.bytes_transferred ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) - result.completion_key ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); - ACE_DEBUG ((LM_DEBUG, "********************\n")); - ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", - result.message_block ().rd_ptr ())); - - if ( result.success () && result.bytes_transferred () != 0) - { - // Successful read: write the data to the file asynchronously. - // Note how we reuse the <ACE_Message_Block> for the writing. - // Therefore, we do not delete this buffer because it is handled - // in <handle_write_stream>. - - if(this->initiate_write_stream (result.message_block (), - - result.bytes_transferred () ) == 0 ) - { - if ( duplex != 0 ) - { - // Initiate new read from the stream. - this->initiate_read_stream () ; - } - } - } - else - { - result.message_block ().release (); - ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); - } - - { - ACE_Guard<MyMutex> locker (m_Mtx) ; - nIOCount-- ; - } - check_destroy () ; -} - -void -Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result - &result) -{ - ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); - - ACE_DEBUG ((LM_DEBUG, "********************\n")); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", - result.bytes_to_write ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", - result.bytes_transferred ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) - result.completion_key ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); - ACE_DEBUG ((LM_DEBUG, "********************\n")); - - result.message_block ().release (); - - if (result.success ()) - { - // This code is not robust enough to deal with short file writes - // (which hardly ever happen) ;-) - //ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ()); - - if ( duplex == 0 ) - { - initiate_read_stream () ; - } - } - - { - ACE_Guard<MyMutex> locker (m_Mtx) ; - nIOCount-- ; - } - check_destroy () ; -} - -//------------------------------------------------------------------------- -// Sender: sends indefinetely welcome message -// and recieves it back -//------------------------------------------------------------------------ -class Sender : public ACE_Handler -{ -public: -Sender (void); -~Sender (void); -int open (const char *host, u_short port); -void close (); -ACE_HANDLE handle (void) const; - -protected: -// These methods are called by the freamwork - -virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result -&result); -// This is called when asynchronous reads from the socket complete - -virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result -&result); -// This is called when asynchronous writes from the socket complete - -private: - -int initiate_read_stream (void); -int initiate_write_stream (void); - -ACE_SOCK_Stream stream_; -// Network I/O handle - -ACE_Asynch_Write_Stream ws_; -// ws (write stream): for writing to the socket - -ACE_Asynch_Read_Stream rs_; -// rs (read file): for reading from the socket - -ACE_Message_Block welcome_message_; -// Welcome message - -MyMutex m_Mtx ; -long nIOCount ; -}; - -static char *data = "Welcome to Irfan World! Irfan RULES here !!\n"; - -Sender::Sender (void) - :nIOCount ( 0 ) -{ - // Moment of inspiration... :-) - this->welcome_message_.init (data, ACE_OS::strlen (data)); -} - -Sender::~Sender (void) -{ - close (); -} - -void Sender::close () -{ - this->stream_.close (); -} - -ACE_HANDLE Sender::handle (void) const -{ - return this->stream_.get_handle (); -} - -int Sender::open (const char *host, u_short port) -{ - // Initialize stuff - // Connect to remote host - ACE_INET_Addr address (port, host); - ACE_SOCK_Connector connector; - - if (connector.connect (this->stream_, - address) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "ACE_SOCK_Connector::connect"), - -1); - } - - // Open ACE_Asynch_Write_Stream - if (this->ws_.open (*this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "ACE_Asynch_Write_Stream::open"), - -1); - - // Open ACE_Asynch_Read_Stream - if (this->rs_.open (*this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "ACE_Asynch_Read_File::open"), - -1); - - // Start an asynchronous transmit file - if ( this->initiate_write_stream () == -1) - return -1; - - if ( duplex != 0 ) - { - // Start an asynchronous read file - if (this->initiate_read_stream () == -1) - return -1; - } - - return 0; -} - -int Sender::initiate_write_stream (void) -{ - ACE_Guard<MyMutex> locker (m_Mtx) ; - - - welcome_message_.rd_ptr( welcome_message_.base ()); - welcome_message_.wr_ptr( welcome_message_.base ()); - welcome_message_.wr_ptr (ACE_OS::strlen (data)); - - if (this->ws_.write (welcome_message_, - welcome_message_.length () - ) == -1) - { - ACE_ERROR_RETURN((LM_ERROR, - "%p\n", - "ACE_Asynch_Write_File::write"), - -1); - } - - nIOCount++ ; - return 0; -} - -int Sender::initiate_read_stream (void) -{ - ACE_Guard<MyMutex> locker (m_Mtx) ; - - // Create a new <Message_Block>. Note that this message block will - // be used both to <read> data asynchronously from the socket and to - // <write> data asynchronously to the file. - ACE_DEBUG ((LM_DEBUG, - "initiate_read_stream called\n")); - - - ACE_Message_Block *mb = 0; - ACE_NEW_RETURN (mb, - ACE_Message_Block (BUFSIZ + 1), - -1); - - // Inititiate read - if (this->rs_.read (*mb, mb->size ()- 1) == -1) - { - mb->release () ; - ACE_ERROR_RETURN ((LM_ERROR, - "%p\n", - "ACE_Asynch_Read_Stream::read"), - -1); - } - - nIOCount++ ; - return 0; -} - - -void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result - &result) -{ - ACE_DEBUG ((LM_DEBUG, - "handle_write_stream called\n")); - - // Reset pointers. - result.message_block ().rd_ptr (result.message_block ().rd_ptr () - - result.bytes_transferred ()); - - - ACE_DEBUG ((LM_DEBUG, "********************\n")); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", - result.bytes_to_write ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", - result.bytes_transferred ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) - result.completion_key ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); - ACE_DEBUG ((LM_DEBUG, "********************\n")); - ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", - result.message_block ().rd_ptr ())); - - // Simplify just for Test - if (result.success () && result.bytes_transferred () != 0) - { - if ( duplex != 0 ) // full duplex, continue write - { - initiate_write_stream () ; - } - else // half-duplex read reply, after read we will start - // write - { - initiate_read_stream () ; - } - } - - { - ACE_Guard<MyMutex> locker (m_Mtx) ; - nIOCount-- ; - } -} - -void -Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) -{ - ACE_DEBUG ((LM_DEBUG, - "handle_read_stream called\n")); - - // Reset pointers. - result.message_block ().rd_ptr ()[result.bytes_transferred ()] = - '\0'; - - ACE_DEBUG ((LM_DEBUG, "********************\n")); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read - ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", - result.bytes_transferred ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) - result.completion_key ())); - ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); - ACE_DEBUG ((LM_DEBUG, "********************\n")); - ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", - result.message_block ().rd_ptr ())); - - result.message_block().release (); - - if ( result.success () && result.bytes_transferred () != 0) - { - // Successful read: write the data to the file asynchronously. - // Note how we reuse the <ACE_Message_Block> for the writing. - // Therefore, we do not delete this buffer because it is handled - // in <handle_write_stream>. - - if ( duplex != 0 ) // full duplex, continue read - { - initiate_read_stream () ; - } - else // half-duplex writey, after write we will start read - { - initiate_write_stream () ; - } - } - - { - ACE_Guard<MyMutex> locker (m_Mtx) ; - nIOCount-- ; - } -} - -//-------------------------------------------------------------------------- - -static int -parse_args (int argc, char *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, "n:p:d:h:"); - int c; - - while ((c = get_opt ()) != EOF) - switch (c) - { - case 'h': - host = get_opt.optarg; - break; - case 'n': - nThreads = ACE_OS::atoi (get_opt.optarg) ; - break; - case 'p': - port = ACE_OS::atoi (get_opt.optarg); - break; - case 'd': - duplex = ACE_OS::atoi (get_opt.optarg); - break; - default: - ACE_ERROR ((LM_ERROR, "%p.\n", - "usage :\n" - "-h <host> for Sender mode\n" - "-d <duplex mode 1-on/0-off>\n" - "-p <port to listen/connect>\n" - "-n <number threads for Proactor pool>\n")); - return -1; - } - - return 0; -} - -int -main (int argc, char *argv[]) -{ - ACE_UNUSED_ARG (initial_read_size); - - if (parse_args (argc, argv) == -1) - return -1; - -#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) - - ACE_WIN32_Proactor * pImpl = new ACE_WIN32_Proactor; - -#elif defined (ACE_HAS_AIO_CALLS) - - // ACE_POSIX_AIOCB_Proactor * pImpl = new ACE_POSIX_AIOCB_Proactor; - ACE_POSIX_SIG_Proactor * pImpl = new ACE_POSIX_SIG_Proactor; -#endif - - ACE_Proactor Proactor ( pImpl ,1 ); - - ACE_Proactor::instance( & Proactor ); - - - MyTask Task1 ; - - if (Task1.activate (THR_NEW_LWP, nThreads ) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1); - } - - Sender sender; - ACE_Asynch_Acceptor<Receiver> acceptor; - - int Rc = -1 ; - - if ( host == NULL ) // Acceptor - { - // Simplify , initial read with zero size - Rc = acceptor.open (ACE_INET_Addr (port),0,1); - - } - else - { - Rc = sender.open (host, port); - } - - if ( Rc == 0 ) - { - char c ; - cout << "Press any key to stop and exit=>\n" << flush ; - cin.clear (); - cin >> c ; - } - - ACE_Proactor::end_event_loop () ; - - if ( host != NULL ) // we are sender - { - sender.close () ; // disconnect to get reciever error !!! - } - - - ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance(); - - pTM->wait_task ( & Task1 ) ; - - ACE_Proactor::instance( ( ACE_Proactor* )NULL ); - - return 0; -} -//-------------------------------------------------------------------- -// -//-------------------------------------------------------------------- -int DisableSignal ( int SigNum ) -{ - -#ifndef ACE_WIN32 - sigset_t signal_set; - if ( sigemptyset (&signal_set) == - 1 ) - { - ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "sigemptyset failed")); - } - - sigaddset (&signal_set, SigNum); - - // Put the <signal_set>. - if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0) - { - ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "pthread_sigmask failed")); - } -#endif - - return 1; -} -//-------------------------------------------------------------------- -// Get the <signal_set> back from the OS. -//-------------------------------------------------------------------- - -int PrintSigMask () -{ -#ifndef ACE_WIN32 - - sigset_t mask ; - int member = 0; - - COUT ( "\n=============Signal Mask==========" ) - - if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask ) != 0) - { - ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "ACE_OS::pthread_sigmask failed")); - } - else for (int i = 1 ; i < 1000; i++) - { - member = sigismember (&mask,i); - - COUT ( "\nSig " ) - COUT ( i ) - COUT ( " is " ) - COUT (member ) - - if (member == -1) - { - break ; - } - } - -#endif - return 0; -} - - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Asynch_Acceptor<Receiver>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Asynch_Acceptor<Receiver> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - -#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ |