diff options
-rw-r--r-- | ChangeLog | 13 | ||||
-rw-r--r-- | tests/Makefile.tests | 5 | ||||
-rw-r--r-- | tests/Multihomed_INET_Addr_Test_IPV6.cpp | 36 | ||||
-rw-r--r-- | tests/Proactor_Test_IPV6.cpp | 2109 | ||||
-rw-r--r-- | tests/SOCK_Send_Recv_Test_IPV6.cpp | 392 | ||||
-rw-r--r-- | tests/SOCK_Test_IPv6.cpp | 281 | ||||
-rw-r--r-- | tests/tests.mpc | 7 |
7 files changed, 2828 insertions, 15 deletions
diff --git a/ChangeLog b/ChangeLog index e290a0582ad..b2f6887f64b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,16 @@ +Fri Jan 16 23:01:11 2004 Balachandran Natarajan <bala@dre.vanderbilt.edu> + + * tests/Multihomed_INET_Addr_Test_IPV6.cpp: + * tests/Proactor_Test_IPV6.cpp: + * tests/SOCK_Send_Recv_Test_IPV6.cpp: + * tests/SOCK_Test_IPv6.cpp: + + More IPV6 tests from Brian Bruesker. + + * tests/Makefile.tests: + + Added these new tests. + Fri Jan 16 21:38:59 2004 Balachandran Natarajan <bala@dre.vanderbilt.edu> * tests/INET_Addr_Test_IPV6.cpp: diff --git a/tests/Makefile.tests b/tests/Makefile.tests index 32cdb54e7f3..262dc4508a7 100644 --- a/tests/Makefile.tests +++ b/tests/Makefile.tests @@ -141,7 +141,10 @@ BIN = \ INET_Addr_Test_IPV6 \ Max_Default_Port_Test_IPV6 \ Multicast_Test_IPV6 \ - Multihomed_INET_Addr_Test_IPV6 + Multihomed_INET_Addr_Test_IPV6 \ + Proactor_Test_IPV6 \ + SOCK_Send_Recv_Test_IPV6 \ + SOCK_Test_IPv6 BIN2 = Naming_Test \ diff --git a/tests/Multihomed_INET_Addr_Test_IPV6.cpp b/tests/Multihomed_INET_Addr_Test_IPV6.cpp index e45008378ca..a717fadbaf4 100644 --- a/tests/Multihomed_INET_Addr_Test_IPV6.cpp +++ b/tests/Multihomed_INET_Addr_Test_IPV6.cpp @@ -1,16 +1,24 @@ // $Id$ // ============================================================================ -/** - * @file Multihomed_INET_Addr_Test_IPV6.cpp - * - * @brief Performs several tests on the Multihomed_ACE_INET_Addr class. - * - * It creates several IPV6 addresses and checks that the address - * formed by the class is valid. - * - * @author Edward Mulholland <emulholl@atl.lmco.com> - * Brian Buesker <bbuesker@qualcomm.com> - */ +// +// = LIBRARY +// tests +// +// = FILENAME +// Multihomed_INET_Addr_Test.cpp +// +// = DESCRIPTION +// Performs several tests on the Multihomed_ACE_INET_Addr class. +// It creates several IPv6 addresses and checks that the +// address formed by the class is valid. +// +// = AUTHOR +// Edward Mulholland (emulholl@atl.lmco.com) +// Brian Buesker (bbuesker@qualcomm.com) - Added testing of +// ACE_Multihomed_INET_Addr class +// using IPv6 addresses based on +// Multihomed_INET_Addr_Test. +// // ============================================================================ #include "test_config.h" @@ -110,7 +118,7 @@ int run_main (int argc, ACE_TCHAR *argv[]) } // Pass the in_out array to the accessor - addr.get_secondary_addresses(in_out, i); + addr.get_secondary_addresses(in_out, i); // Check that the in_out array matches stay_out array for (j = 0; j < i; ++j) { @@ -138,7 +146,7 @@ int run_main (int argc, ACE_TCHAR *argv[]) // Check that the primary address in the in_out_sockaddr array // matches the primary address reported by the superclass - if (ACE_OS::memcmp(in_out_sockaddr6, addr.get_addr(), + if (ACE_OS::memcmp(in_out_sockaddr6, addr.get_addr(), sizeof(sockaddr_in6))) { ACE_ERROR ((LM_ERROR, @@ -155,7 +163,7 @@ int run_main (int argc, ACE_TCHAR *argv[]) j < i + 1; ++j, ++pointer6) { - if (ACE_OS::memcmp(pointer6, stay_out[j-1].get_addr(), + if (ACE_OS::memcmp(pointer6, stay_out[j-1].get_addr(), sizeof(sockaddr_in6))) { ACE_ERROR ((LM_ERROR, diff --git a/tests/Proactor_Test_IPV6.cpp b/tests/Proactor_Test_IPV6.cpp new file mode 100644 index 00000000000..4bd5bc3da80 --- /dev/null +++ b/tests/Proactor_Test_IPV6.cpp @@ -0,0 +1,2109 @@ +// $Id$ +// ============================================================================ +/** + * @file Proactor_Test_IPv6.cpp + * + * This program illustrates how the ACE_Proactor can be used to + * implement an application that does various asynchronous + * operations. + * + * @author Alexander Libman <alibman@baltimore.com> + * @author Brian Buesker <bbuesker@qualcomm.com> - modified for IPv6 operation + */ +// ============================================================================ + +#include "test_config.h" + +#if defined (ACE_HAS_THREADS) && ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) + // This only works on Win32 platforms and on Unix platforms + // supporting POSIX aio calls. + +#include "ace/Signal.h" + +#include "ace/Service_Config.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "ace/Object_Manager.h" +#include "ace/Get_Opt.h" +//#include "ace/streams.h" + +#include "ace/Proactor.h" +#include "ace/Asynch_Acceptor.h" +#include "ace/Asynch_Connector.h" +#include "ace/Task.h" +#include "ace/Thread_Semaphore.h" +#include "ace/OS_NS_signal.h" +#include "ace/OS_NS_errno.h" +#include "ace/os_include/netinet/os_tcp.h" + +#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) + +# include "ace/WIN32_Proactor.h" + +#elif defined (ACE_HAS_AIO_CALLS) + +# include "ace/POSIX_Proactor.h" +# include "ace/POSIX_CB_Proactor.h" +# include "ace/SUN_Proactor.h" + +#endif /* defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) */ + +// Proactor Type (UNIX only, Win32 ignored) +typedef enum { DEFAULT = 0, AIOCB, SIG, SUN, CB } ProactorType; +static ProactorType proactor_type = DEFAULT; + +// POSIX : > 0 max number aio operations proactor, +static size_t max_aio_operations = 0; + +// both: 0 run client or server / depends on host +// != 0 run client and server +static int both = 0; + +// Host that we're connecting to. +static const ACE_TCHAR *host = 0; + +// number of Senders instances +static int senders = 1; +const int MAX_SENDERS = 1000; +const int MAX_RECEIVERS = 1000; + +// duplex mode: == 0 half-duplex +// != 0 full duplex +static int duplex = 0; + +// number threads in the Proactor thread pool +static int threads = 1; + +// Port that we're receiving connections on. +static u_short port = ACE_DEFAULT_SERVER_PORT; + +// Log options +static int loglevel; // 0 full , 1 only errors + +static size_t xfer_limit; // Number of bytes for Sender to send. + +static ACE_TCHAR complete_message[] = + ACE_TEXT ("GET / HTTP/1.1\r\n") + ACE_TEXT ("Accept: */*\r\n") + ACE_TEXT ("Accept-Language: C++\r\n") + ACE_TEXT ("Accept-Encoding: gzip, deflate\r\n") + ACE_TEXT ("User-Agent: Proactor_Test_IPv6/1.0 (non-compatible)\r\n") + ACE_TEXT ("Connection: Keep-Alive\r\n") + ACE_TEXT ("\r\n"); + +class LogLocker +{ +public: + + LogLocker () { ACE_LOG_MSG->acquire (); } + virtual ~LogLocker () { ACE_LOG_MSG->release (); } +}; + + + +// Function to remove signals from the signal mask. +static int +disable_signal (int sigmin, int sigmax) +{ +#ifndef ACE_WIN32 + + sigset_t signal_set; + if (sigemptyset (&signal_set) == - 1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Error: (%P|%t):%p\n"), + ACE_TEXT ("sigemptyset failed"))); + + for (int i = sigmin; i <= sigmax; i++) + sigaddset (&signal_set, i); + + // Put the <signal_set>. + if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Error: (%P|%t):%p\n"), + ACE_TEXT ("pthread_sigmask failed"))); +#else + ACE_UNUSED_ARG (sigmin); + ACE_UNUSED_ARG (sigmax); +#endif /* ACE_WIN32 */ + + return 1; +} + + +// ************************************************************* +// MyTask is ACE_Task resposible for : +// 1. creation and deletion of +// Proactor and Proactor thread pool +// 2. running Proactor event loop +// ************************************************************* + +/** + * @class MyTask + * + * MyTask plays role for Proactor threads pool + * + * MyTask is ACE_Task resposible for: + * 1. Creation and deletion of Proactor and Proactor thread pool + * 2. Running Proactor event loop + */ +class MyTask : public ACE_Task<ACE_MT_SYNCH> +{ +public: + MyTask (void): lock_ (), sem_ (0), proactor_(0) {} + + virtual ~MyTask() + { + (void) this->stop (); + (void) this->delete_proactor(); + } + + virtual int svc (void); + + int start (int num_threads, + ProactorType type_proactor, + size_t max_op ); + int stop (void); + +private: + int create_proactor (ProactorType type_proactor, + size_t max_op); + int delete_proactor (void); + + ACE_SYNCH_RECURSIVE_MUTEX lock_; + ACE_Thread_Semaphore sem_; + ACE_Proactor * proactor_; + +}; + +int +MyTask::create_proactor (ProactorType type_proactor, size_t max_op) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, + monitor, + this->lock_, + -1); + + ACE_ASSERT (this->proactor_ == 0); + +#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) + + ACE_UNUSED_ARG (type_proactor); + ACE_UNUSED_ARG (max_op); + + ACE_WIN32_Proactor *proactor_impl = 0; + + ACE_NEW_RETURN (proactor_impl, + ACE_WIN32_Proactor, + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%t) Create Proactor Type = WIN32\n"))); + +#elif defined (ACE_HAS_AIO_CALLS) + + ACE_POSIX_Proactor * proactor_impl = 0; + + switch (type_proactor) + { + case AIOCB: + ACE_NEW_RETURN (proactor_impl, + ACE_POSIX_AIOCB_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n"))); + break; + +#if defined(ACE_HAS_POSIX_REALTIME_SIGNALS) + case SIG: + ACE_NEW_RETURN (proactor_impl, + ACE_POSIX_SIG_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Create Proactor Type = SIG\n"))); + break; +#endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */ + +# if defined (sun) + case SUN: + ACE_NEW_RETURN (proactor_impl, + ACE_SUN_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%t) Create Proactor Type = SUN\n"))); + break; +# endif /* sun */ + +# if defined (__sgi) + case CB: + ACE_NEW_RETURN (proactor_impl, + ACE_POSIX_CB_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Create Proactor Type = CB\n"))); + break; +# endif + + default: + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n"))); + break; + } + +#endif // (ACE_WIN32) && !defined (ACE_HAS_WINCE) + + // always delete implementation 1 , not !(proactor_impl == 0) + ACE_NEW_RETURN (this->proactor_, + ACE_Proactor (proactor_impl, 1 ), + -1); + // Set new singleton and delete it in close_singleton() + ACE_Proactor::instance (this->proactor_, 1); + return 0; +} + +int +MyTask::delete_proactor (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, + monitor, + this->lock_, + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Delete Proactor\n"))); + + ACE_Proactor::close_singleton (); + this->proactor_ = 0; + + return 0; +} + +int +MyTask::start (int num_threads, + ProactorType type_proactor, + size_t max_op) +{ + if (this->create_proactor (type_proactor, max_op) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to create proactor")), + -1); + + if (this->activate (THR_NEW_LWP, num_threads) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to activate thread pool")), + -1); + + for (; num_threads > 0; num_threads--) + { + sem_.acquire (); + } + + return 0; +} + + +int +MyTask::stop () +{ + if (this->proactor_ != 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Calling End Proactor event loop\n"))); + + ACE_Proactor::end_event_loop (); + } + + if (this->wait () == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to stop thread pool"))); + + return 0; +} + +int +MyTask::svc (void) +{ + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n"))); + + disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); + + // signal that we are ready + sem_.release (1); + + ACE_Proactor::run_event_loop (); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n"))); + return 0; +} + +// ************************************************************* +// Receiver and Acceptor +// ************************************************************* +// forward declaration +class Acceptor; + +class Receiver : public ACE_Service_Handler +{ + friend class Acceptor; +public: + Receiver (Acceptor *acceptor = 0, int index = -1); + ~Receiver (void); + + size_t get_total_snd (void) { return this->total_snd_; } + size_t get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } + + // This is called to pass the new connection's addresses. + virtual void addresses (const ACE_INET_Addr& peer, + const ACE_INET_Addr& local); + + /// This is called after the new connection has been accepted. + virtual void open (ACE_HANDLE handle, + ACE_Message_Block &message_block); + +protected: + /** + * @name AIO callback handling + * + * These methods are called by the framework + */ + /// This is called when asynchronous <read> operation from the + /// socket completes. + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + + /// This is called when an asynchronous <write> to the socket + /// completes. + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + +private: + int initiate_read_stream (void); + int initiate_write_stream (ACE_Message_Block &mb, size_t nbytes); + void cancel (); + + Acceptor *acceptor_; + int index_; + + ACE_Asynch_Read_Stream rs_; + ACE_Asynch_Write_Stream ws_; + ACE_HANDLE handle_; + ACE_SYNCH_MUTEX lock_; + + long io_count_; // Number of currently outstanding I/O requests + int flg_cancel_; + size_t total_snd_; // Number of bytes successfully sent + size_t total_rcv_; // Number of bytes successfully received + long total_w_; // Number of write operations + long total_r_; // Number of read operations +}; + +class Acceptor : public ACE_Asynch_Acceptor<Receiver> +{ + friend class Receiver; +public: + int get_number_sessions (void) { return this->sessions_; } + size_t get_total_snd (void) { return this->total_snd_; } + size_t get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } + + Acceptor (void); + virtual ~Acceptor (void); + + void stop (void); + void cancel_all (void); + + // Virtual from ACE_Asynch_Acceptor + Receiver *make_handler (void); + +private: + void on_new_receiver (Receiver &rcvr); + void on_delete_receiver (Receiver &rcvr); + + ACE_SYNCH_RECURSIVE_MUTEX lock_; + int sessions_; + Receiver *list_receivers_[MAX_RECEIVERS]; + size_t total_snd_; + size_t total_rcv_; + long total_w_; + long total_r_; +}; + +// ************************************************************* +Acceptor::Acceptor (void) + : sessions_ (0), + total_snd_(0), + total_rcv_(0), + total_w_ (0), + total_r_ (0) +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + for (int i = 0; i < MAX_RECEIVERS; ++i) + this->list_receivers_[i] = 0; +} + +Acceptor::~Acceptor (void) +{ + this->stop (); +} + + +void +Acceptor::cancel_all (void) +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + this->cancel (); + + for (int i = 0; i < MAX_RECEIVERS; ++i) + { + if (this->list_receivers_[i] != 0) + this->list_receivers_[i]->cancel (); + } + return; +} + + +void +Acceptor::stop (void) +{ + // This method can be called only after proactor event loop is done + // in all threads. + + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + for (int i = 0; i < MAX_RECEIVERS; ++i) + { + delete this->list_receivers_[i]; + this->list_receivers_[i] = 0; + } +} + +void +Acceptor::on_new_receiver (Receiver & rcvr) +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + this->sessions_++; + this->list_receivers_[rcvr.index_] = &rcvr; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Acceptor: receiver %d up; now have %d.\n"), + rcvr.index_, + this->sessions_)); +} + +void +Acceptor::on_delete_receiver (Receiver & rcvr) +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + this->sessions_--; + + this->total_snd_ += rcvr.get_total_snd(); + this->total_rcv_ += rcvr.get_total_rcv(); + this->total_w_ += rcvr.get_total_w(); + this->total_r_ += rcvr.get_total_r(); + + if (rcvr.index_ >= 0 + && rcvr.index_ < MAX_RECEIVERS + && this->list_receivers_[rcvr.index_] == &rcvr) + this->list_receivers_[rcvr.index_] = 0; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Acceptor: receiver %d gone; %d remain\n"), + rcvr.index_, + this->sessions_)); +} + +Receiver * +Acceptor::make_handler (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, 0); + + if (this->sessions_ >= MAX_RECEIVERS) + return 0; + + for (int i = 0; i < MAX_RECEIVERS; ++i) + { + if (this->list_receivers_[i] == 0) + { + ACE_NEW_RETURN (this->list_receivers_[i], + Receiver (this, i), + 0); + return this->list_receivers_[i]; + } + } + + return 0; +} +// *************************************************** +Receiver::Receiver (Acceptor * acceptor, int index) + : acceptor_ (acceptor), + index_ (index), + handle_ (ACE_INVALID_HANDLE), + io_count_ (0), + flg_cancel_(0), + total_snd_(0), + total_rcv_(0), + total_w_ (0), + total_r_ (0) +{ + if (this->acceptor_ != 0) + this->acceptor_->on_new_receiver (*this); +} + +Receiver::~Receiver (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d dtor; %d sends (%d bytes); ") + ACE_TEXT ("%d recvs (%d bytes)\n"), + this->index_, + this->total_w_, this->total_snd_, + this->total_r_, this->total_rcv_)); + if (this->io_count_ != 0) + ACE_ERROR ((LM_WARNING, + ACE_TEXT ("(%t) Receiver %d deleted with ") + ACE_TEXT ("%d I/O outstanding\n"), + this->index_, + this->io_count_)); + + // This test bounces data back and forth between Senders and Receivers. + // Therefore, if there was significantly more data in one direction, that's + // a problem. Remember, the byte counts are unsigned values. + int issue_data_warning = 0; + if (this->total_snd_ > this->total_rcv_) + { + if (this->total_rcv_ == 0) + issue_data_warning = 1; + else if (this->total_snd_ / this->total_rcv_ > 2) + issue_data_warning = 1; + } + else + { + if (this->total_snd_ == 0) + issue_data_warning = 1; + else if (this->total_rcv_ / this->total_snd_ > 2) + issue_data_warning = 1; + } + if (issue_data_warning) + ACE_DEBUG ((LM_WARNING, + ACE_TEXT ("(%t) Above byte counts look odd; need review\n"))); + + if (this->acceptor_ != 0) + this->acceptor_->on_delete_receiver (*this); + + if (this->handle_ != ACE_INVALID_HANDLE) + ACE_OS::closesocket (this->handle_); + + this->index_ = -1; + this->handle_= ACE_INVALID_HANDLE; +} + +void +Receiver::cancel () +{ + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); + + this->flg_cancel_ = 1; + this->ws_.cancel (); + this->rs_.cancel (); + return; +} + + +void +Receiver::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&) +{ + ACE_TCHAR str[256]; + if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR))) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d connection from %s\n"), + this->index_, + str)); + else + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"), + this->index_, + ACE_TEXT ("addr_to_string"))); + return; +} + + +void +Receiver::open (ACE_HANDLE handle, ACE_Message_Block &) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); + + // Don't buffer serial sends. + this->handle_ = handle; + int nodelay = 1; + ACE_SOCK_Stream option_setter (handle); + if (-1 == option_setter.set_option (ACE_IPPROTO_TCP, + TCP_NODELAY, + &nodelay, + sizeof (nodelay))) + ACE_ERROR ((LM_ERROR, "%p\n", "set_option")); + + if (this->ws_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open"))); + else if (this->rs_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open"))); + else + this->initiate_read_stream (); + + if (this->io_count_ > 0) + return; + } + delete this; +} + +int +Receiver::initiate_read_stream (void) +{ + if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) + return -1; + + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (1024), //BUFSIZ + 1), + -1); + + // Inititiate read + if (this->rs_.read (*mb, mb->size () - 1) == -1) + { + mb->release (); +#if defined (ACE_WIN32) + // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get + // a 0-byte read as we would if underlying calls used WSARecv. + if (ACE_OS::last_error () == ERROR_NETNAME_DELETED) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d, peer closed\n"), + this->index_), + -1); +#endif /* ACE_WIN32 */ + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) Receiver %d, %p\n"), + this->index_, + ACE_TEXT ("read")), + -1); + } + + this->io_count_++; + this->total_r_++; + return 0; +} + +int +Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes) +{ + if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) + { + mb.release (); + return -1; + } + + if (nbytes == 0) + { + mb.release (); + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("(%t) Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")), + -1); + } + + if (this->ws_.write (mb, nbytes) == -1) + { + mb.release (); +#if defined (ACE_WIN32) + // On peer close, WriteFile will yield ERROR_NETNAME_DELETED. + if (ACE_OS::last_error () == ERROR_NETNAME_DELETED) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d, peer gone\n"), + this->index_), + -1); +#endif /* ACE_WIN32 */ + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("(%t) Receiver %d, %p\n"), + this->index_, + ACE_TEXT ("write")), + -1); + } + + this->io_count_++; + this->total_w_++; + return 0; +} + +void +Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ ); + + ACE_Message_Block & mb = result.message_block (); + + // Reset pointers. + mb.rd_ptr ()[result.bytes_transferred ()] = '\0'; + + if (loglevel > 1) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) **** Receiver %d: handle_read_stream() ****\n"), + this->index_)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_to_read"), + result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("handle"), + result.handle ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_transfered"), + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %@\n"), + ACE_TEXT ("act"), + result.act ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("success"), + result.success ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %@\n"), + ACE_TEXT ("completion_key"), + result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("error"), + result.error ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %s\n"), + ACE_TEXT ("message_block"), + mb.rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** end of message ****************\n"))); + } + else if (result.error () != 0) + { + ACE_Log_Priority prio; +#if defined (ACE_WIN32) + if (result.error () == ERROR_OPERATION_ABORTED) + prio = LM_DEBUG; +#else + if (result.error () == ECANCELED) + prio = LM_DEBUG; +#endif /* ACE_WIN32 */ + else + prio = LM_ERROR; + ACE_Log_Msg::instance ()->errnum (result.error ()); + ACE_Log_Msg::instance ()->log (prio, + ACE_TEXT ("(%t) Receiver %d; %p\n"), + this->index_, + ACE_TEXT ("read")); + } + else if (loglevel > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d: read %d bytes\n"), + this->index_, + result.bytes_transferred ())); + } + + if (result.error () == 0 && result.bytes_transferred () > 0) + { + this->total_rcv_ += result.bytes_transferred (); + + if (this->initiate_write_stream (mb, + result.bytes_transferred ()) == 0) + { + if (duplex != 0) // Initiate new read from the stream. + this->initiate_read_stream (); + } + } + else + mb.release (); + + this->io_count_--; + if (this->io_count_ > 0) + return; + } + delete this; +} + +void +Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); + + ACE_Message_Block & mb = result.message_block (); + + if (loglevel > 1) + { + LogLocker log_lock; + + //mb.rd_ptr () [0] = '\0'; + mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) **** Receiver %d: handle_write_stream() ****\n"), + this->index_)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_to_write"), + result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("handle"), + result.handle ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_transfered"), + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %@\n"), + ACE_TEXT ("act"), + result.act ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("success"), + result.success ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %@\n"), + ACE_TEXT ("completion_key"), + result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("error"), + result.error ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %s\n"), + ACE_TEXT ("message_block"), + mb.rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** end of message ****************\n"))); + } + else if (result.error () != 0) + { + ACE_Log_Priority prio; +#if defined (ACE_WIN32) + if (result.error () == ERROR_OPERATION_ABORTED) + prio = LM_DEBUG; +#else + if (result.error () == ECANCELED) + prio = LM_DEBUG; +#endif /* ACE_WIN32 */ + else + prio = LM_ERROR; + ACE_Log_Msg::instance ()->errnum (result.error ()); + ACE_Log_Msg::instance ()->log (prio, + ACE_TEXT ("(%t) Receiver %d; %p\n"), + this->index_, + ACE_TEXT ("write")); + } + else if (loglevel > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d: wrote %d bytes ok\n"), + this->index_, + result.bytes_transferred ())); + } + + mb.release (); + + if (result.error () == 0 && result.bytes_transferred () > 0) + { + this->total_snd_ += result.bytes_transferred (); + + if (duplex == 0) + this->initiate_read_stream (); + } + + this->io_count_--; + if (this->io_count_ > 0) + return; + } + delete this; +} + +// ******************************************* +// Sender +// ******************************************* + +class Connector; + +class Sender : public ACE_Service_Handler +{ + friend class Connector; +public: + + /// This is called after the new connection has been established. + virtual void open (ACE_HANDLE handle, + ACE_Message_Block &message_block); + + Sender (Connector *connector = 0, int index = -1); + ~Sender (void); + + size_t get_total_snd (void) { return this->total_snd_; } + size_t get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } + + // This is called to pass the new connection's addresses. + virtual void addresses (const ACE_INET_Addr& peer, + const ACE_INET_Addr& local); + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // This is called when asynchronous reads from the socket complete + + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This is called when asynchronous writes from the socket complete + +private: + int initiate_read_stream (void); + int initiate_write_stream (void); + void cancel (void); + void close (void); + + int index_; + Connector * connector_; + + ACE_Asynch_Read_Stream rs_; + ACE_Asynch_Write_Stream ws_; + ACE_HANDLE handle_; + + ACE_SYNCH_MUTEX lock_; + + long io_count_; + int stop_writing_; // Writes are shut down; just read. + int flg_cancel_; + size_t total_snd_; + size_t total_rcv_; + long total_w_; + long total_r_; +}; + +class Connector : public ACE_Asynch_Connector<Sender> +{ + friend class Sender; +public: + int get_number_sessions (void) { return this->sessions_; } + size_t get_total_snd (void) { return this->total_snd_; } + size_t get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } + + Connector (void); + virtual ~Connector (void); + + int start (const ACE_INET_Addr &addr, int num); + void stop (void); + void cancel_all (void); + + // Virtual from ACE_Asynch_Connector + Sender *make_handler (void); + +private: + void on_new_sender (Sender &rcvr); + void on_delete_sender (Sender &rcvr); + + ACE_SYNCH_RECURSIVE_MUTEX lock_; + int sessions_; + Sender *list_senders_[MAX_SENDERS]; + size_t total_snd_; + size_t total_rcv_; + long total_w_; + long total_r_; +}; + +// ************************************************************* + +Connector::Connector (void) + : sessions_ (0), + total_snd_(0), + total_rcv_(0), + total_w_ (0), + total_r_ (0) +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + for (int i = 0; i < MAX_SENDERS; ++i) + this->list_senders_[i] = 0; +} + +Connector::~Connector (void) +{ + this->stop (); +} + + +void +Connector::cancel_all(void) +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + this->cancel (); + + for (int i = 0; i < MAX_SENDERS; ++i) + { + if (this->list_senders_[i] != 0) + this->list_senders_[i]->cancel (); + } + return; +} + + +void +Connector::stop (void) +{ + // This method can be called only after proactor event loop is done + // in all threads. + + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + for (int i = 0; i < MAX_SENDERS; ++i) + { + delete this->list_senders_[i]; + this->list_senders_[i] = 0; + } +} + +void +Connector::on_new_sender (Sender &sndr) +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + this->sessions_++; + this->list_senders_[sndr.index_] = &sndr; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Connector: sender %d up; now have %d.\n"), + sndr.index_, + this->sessions_)); +} + +void +Connector::on_delete_sender (Sender &sndr) +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + this->sessions_--; + this->total_snd_ += sndr.get_total_snd(); + this->total_rcv_ += sndr.get_total_rcv(); + this->total_w_ += sndr.get_total_w(); + this->total_r_ += sndr.get_total_r(); + + if (sndr.index_ >= 0 + && sndr.index_ < MAX_SENDERS + && this->list_senders_[sndr.index_] == &sndr) + this->list_senders_[sndr.index_] = 0; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Connector: sender %d gone; %d remain\n"), + sndr.index_, + this->sessions_)); +} + +Sender * +Connector::make_handler (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, 0); + + if (this->sessions_ >= MAX_SENDERS) + return 0; + + for (int i = 0; i < MAX_SENDERS; ++i) + { + if (this->list_senders_ [i] == 0) + { + ACE_NEW_RETURN (this->list_senders_[i], + Sender (this, i), + 0); + return this->list_senders_[i]; + } + } + + return 0; +} + + +int +Connector::start (const ACE_INET_Addr& addr, int num) +{ + + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, 0); + + if (num > MAX_SENDERS) + num = MAX_SENDERS; + + if (num < 0) + num = 1; + + int rc = 0; + + // int open ( int pass_addresses = 0, + // ACE_Proactor *proactor = 0, + // int validate_new_connection = 0 ); + + if (this->open (1, 0, 1) != 0) + { + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("(%t) %p\n"), + ACE_LIB_TEXT ("Connector::open failed"))); + return rc; + } + + for (; rc < num; rc++) + { + ACE_INET_Addr localAddr; + if (this->connect (addr, localAddr) != 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Connector::connect failed for IPv6"))); + break; + } + } + return rc; +} + + +Sender::Sender (Connector * connector, int index) + : index_ (index), + connector_ (connector), + handle_ (ACE_INVALID_HANDLE), + io_count_ (0), + stop_writing_ (0), + flg_cancel_ (0), + total_snd_ (0), + total_rcv_ (0), + total_w_ (0), + total_r_ (0) +{ + if (this->connector_ != 0) + this->connector_->on_new_sender (*this); +} + +Sender::~Sender (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d dtor; %d sends (%d bytes); ") + ACE_TEXT ("%d recvs (%d bytes)\n"), + this->index_, + this->total_w_, this->total_snd_, + this->total_r_, this->total_rcv_)); + if (this->io_count_ != 0) + ACE_ERROR ((LM_WARNING, + ACE_TEXT ("(%t) Sender %d deleted with %d I/O outstanding\n"), + this->index_, + this->io_count_)); + + // This test bounces data back and forth between Senders and Receivers. + // Therefore, if there was significantly more data in one direction, that's + // a problem. Remember, the byte counts are unsigned values. + int issue_data_warning = 0; + if (this->total_snd_ > this->total_rcv_) + { + if (this->total_rcv_ == 0) + issue_data_warning = 1; + else if (this->total_snd_ / this->total_rcv_ > 2) + issue_data_warning = 1; + } + else + { + if (this->total_snd_ == 0) + issue_data_warning = 1; + else if (this->total_rcv_ / this->total_snd_ > 2) + issue_data_warning = 1; + } + if (issue_data_warning) + ACE_DEBUG ((LM_WARNING, + ACE_TEXT ("(%t) Above byte counts look odd; need review\n"))); + + if (this->connector_ != 0) + this->connector_->on_delete_sender (*this); + + if (this->handle_ != ACE_INVALID_HANDLE) + { + ACE_OS::closesocket (this->handle_); + } + + this->index_ = -1; + this->handle_= ACE_INVALID_HANDLE; +} + +void +Sender::cancel () +{ + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); + + this->flg_cancel_ = 1; + this->ws_.cancel (); + this->rs_.cancel (); + return; +} + +void +Sender::close () +{ + // This must be called with the lock_ held. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Closing Sender %d writes; %d I/O outstanding\n"), + this->index_, this->io_count_)); + ACE_OS::shutdown (this->handle_, ACE_SHUTDOWN_WRITE); + this->stop_writing_ = 1; + return; +} + + +void +Sender::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr& local) +{ + ACE_TCHAR str[256]; + ACE_TCHAR str2[256]; + ACE_INET_Addr addr ((u_short) 0, host); + + // This checks to make sure the peer address given to us matches what + // we expect it to be. + // This check will fail when Asynch_Connector::parse_addresses does + // not handle IPv6 addresses + if (0 != peer.get_host_addr (str, sizeof (str) / sizeof (ACE_TCHAR))) + { + if (0 != addr.get_host_addr (str2, sizeof (str2) / sizeof (ACE_TCHAR))) + { + if (0 != strncmp (str, str2, sizeof (str) / sizeof (ACE_TCHAR))) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Sender %d peer address (%s) does not " + "match host address (%s)\n"), + this->index_, + str, str2)); + return; + } + } + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Sender %d unable to convert host addr\n"), + this->index_)); + return; + } + } + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) Sender %d unable to convert peer addr\n"), + this->index_)); + return; + } + + if (0 == local.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR))) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d connected on %s\n"), + this->index_, + str)); + } + else + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"), + this->index_, + ACE_TEXT ("addr_to_string"))); + return; +} + + +void +Sender::open (ACE_HANDLE handle, ACE_Message_Block &) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); + + // Don't buffer serial sends. + this->handle_ = handle; + int nodelay = 1; + ACE_SOCK_Stream option_setter (handle); + if (option_setter.set_option (ACE_IPPROTO_TCP, + TCP_NODELAY, + &nodelay, + sizeof (nodelay))) + ACE_ERROR ((LM_ERROR, "%p\n", "set_option")); + + // Open ACE_Asynch_Write_Stream + if (this->ws_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open"))); + + // Open ACE_Asynch_Read_Stream + else if (this->rs_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open"))); + + else if (this->initiate_write_stream () == 0) + { + if (duplex != 0) // Start an asynchronous read + this->initiate_read_stream (); + } + + if (this->io_count_ > 0) + return; + } + delete this; +} + +int +Sender::initiate_write_stream (void) +{ + if (this->flg_cancel_ != 0 || + this->stop_writing_ || + this->handle_ == ACE_INVALID_HANDLE) + return -1; + + static const size_t complete_message_length = ACE_OS::strlen (complete_message); + +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) + + ACE_Message_Block *mb1 = 0, + *mb2 = 0, + *mb3 = 0; + + // No need to allocate +1 for proper printing - the memory includes it already + ACE_NEW_RETURN (mb1, + ACE_Message_Block ((char *)complete_message, + complete_message_length), + -1); + + ACE_NEW_RETURN (mb2, + ACE_Message_Block ((char *)complete_message, + complete_message_length), + -1); + + ACE_NEW_RETURN (mb3, + ACE_Message_Block ((char *)complete_message, + complete_message_length), + -1); + + mb1->wr_ptr (complete_message_length); + mb2->wr_ptr (complete_message_length); + mb3->wr_ptr (complete_message_length); + + // chain them together + mb1->cont (mb2); + mb2->cont (mb3); + + if (this->ws_.writev (*mb1, mb1->total_length ()) == -1) + { + mb1->release (); + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")), + -1); + } +#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ + + ACE_Message_Block *mb = 0; + + // No need to allocate +1 for proper printing - the memory includes it already + ACE_NEW_RETURN (mb, + ACE_Message_Block (complete_message, complete_message_length), + -1); + mb->wr_ptr (complete_message_length); + + if (this->ws_.write (*mb, mb->length ()) == -1) + { + mb->release (); +#if defined (ACE_WIN32) + // On peer close, WriteFile will yield ERROR_NETNAME_DELETED. + if (ACE_OS::last_error () == ERROR_NETNAME_DELETED) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d, peer gone\n"), + this->index_), + -1); +#endif /* ACE_WIN32 */ + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("(%t) Sender %d, %p\n"), + this->index_, + ACE_TEXT ("write")), + -1); + } +#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ + + this->io_count_++; + this->total_w_++; + return 0; +} + +int +Sender::initiate_read_stream (void) +{ + if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) + return -1; + + static const size_t complete_message_length = ACE_OS::strlen (complete_message); + +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) + ACE_Message_Block *mb1 = 0, + *mb2 = 0, + *mb3 = 0, + *mb4 = 0, + *mb5 = 0, + *mb6 = 0; + + // We allocate +1 only for proper printing - we can just set the last byte + // to '\0' before printing out + ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1); + ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1); + ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1); + + // Let allocate memory for one more triplet, + // This improves performance + // as we can receive more the than one block at once + // Generally, we can receive more triplets .... + ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1); + ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1); + ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1); + + mb1->cont (mb2); + mb2->cont (mb3); + + mb3->cont (mb4); + mb4->cont (mb5); + mb5->cont (mb6); + + + // hide last byte in each message block, reserving it for later to set '\0' + // for proper printouts + mb1->size (mb1->size () - 1); + mb2->size (mb2->size () - 1); + mb3->size (mb3->size () - 1); + + mb4->size (mb4->size () - 1); + mb5->size (mb5->size () - 1); + mb6->size (mb6->size () - 1); + + // Inititiate read + if (this->rs_.readv (*mb1, mb1->total_size () - 1) == -1) + { + mb1->release (); + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::readv")), + -1); + } +#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ + + // Try to read more chunks + size_t blksize = ( complete_message_length > BUFSIZ ) ? + complete_message_length : BUFSIZ; + + ACE_Message_Block *mb = 0; + + // We allocate +1 only for proper printing - we can just set the last byte + // to '\0' before printing out + ACE_NEW_RETURN (mb, + ACE_Message_Block (blksize + 1) + , -1); + + // Inititiate read + if (this->rs_.read (*mb, mb->size () - 1) == -1) + { + mb->release (); +#if defined (ACE_WIN32) + // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get + // a 0-byte read as we would if underlying calls used WSARecv. + if (ACE_OS::last_error () == ERROR_NETNAME_DELETED) + ACE_ERROR_RETURN ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d, peer closed\n"), + this->index_), + -1); +#endif /* ACE_WIN32 */ + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) Sender %d, %p\n"), + this->index_, + ACE_TEXT ("read")), + -1); + } +#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ + + this->io_count_++; + this->total_r_++; + return 0; +} + +void +Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); + + ACE_Message_Block & mb = result.message_block (); + + if (loglevel > 1) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) **** Sender %d: handle_write_stream() ****\n"), + index_)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_to_write"), + result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("handle"), + result.handle ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_transfered"), + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %@\n"), + ACE_TEXT ("act"), + result.act ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("success"), + result.success ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %@\n"), + ACE_TEXT ("completion_key"), + result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("error"), + result.error ())); + +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) + size_t bytes_transferred = result.bytes_transferred (); + char index = 0; + for (ACE_Message_Block* mb_i = &mb; + (mb_i != 0) && (bytes_transferred > 0); + mb_i = mb_i->cont ()) + { + // write 0 at string end for proper printout (if end of mb, it's 0 already) + mb_i->rd_ptr()[0] = '\0'; + + size_t len = mb_i->rd_ptr () - mb_i->base (); + + // move rd_ptr backwards as required for printout + if (len >= bytes_transferred) + { + mb_i->rd_ptr (0 - bytes_transferred); + bytes_transferred = 0; + } + else + { + mb_i->rd_ptr (0 - len); + bytes_transferred -= len; + } + + ++index; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s%d = %s\n"), + ACE_TEXT ("message_block, part "), + index, + mb_i->rd_ptr ())); + } +#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ + // write 0 at string end for proper printout (if end of mb, it's 0 already) + mb.rd_ptr()[0] = '\0'; + // move rd_ptr backwards as required for printout + mb.rd_ptr (- result.bytes_transferred ()); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %s\n"), + ACE_TEXT ("message_block"), + mb.rd_ptr ())); +#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** end of message ****************\n"))); + } + else if (result.error () != 0) + { + ACE_Log_Priority prio; +#if defined (ACE_WIN32) + if (result.error () == ERROR_OPERATION_ABORTED) + prio = LM_DEBUG; +#else + if (result.error () == ECANCELED) + prio = LM_DEBUG; +#endif /* ACE_WIN32 */ + else + prio = LM_ERROR; + ACE_Log_Msg::instance ()->errnum (result.error ()); + ACE_Log_Msg::instance ()->log (prio, + ACE_TEXT ("(%t) Sender %d; %p\n"), + this->index_, + ACE_TEXT ("write")); + } + else if (loglevel > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d: wrote %d bytes ok\n"), + this->index_, + result.bytes_transferred ())); + } + + mb.release (); + + if (result.error () == 0 && result.bytes_transferred () > 0) + { + this->total_snd_ += result.bytes_transferred (); + if (this->total_snd_ >= xfer_limit) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d sent %d, limit %d\n"), + this->index_, this->total_snd_, xfer_limit)); + this->close (); + } + if (duplex != 0) // full duplex, continue write + { + if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control + this->initiate_write_stream (); + } + else // half-duplex read reply, after read we will start write + this->initiate_read_stream (); + } + + this->io_count_--; + if (this->io_count_ > 0) + return; + } + delete this; +} + +void +Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); + + ACE_Message_Block & mb = result.message_block (); + + if (loglevel > 1) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) **** Sender %d: handle_read_stream() ****\n"), + index_)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_to_read"), + result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("handle"), + result.handle ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_transfered"), + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %@\n"), + ACE_TEXT ("act"), + result.act ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("success"), + result.success ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %@\n"), + ACE_TEXT ("completion_key"), + result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("error"), + result.error ())); + +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) + char index = 0; + for (ACE_Message_Block* mb_i = &mb; + mb_i != 0; + mb_i = mb_i->cont ()) + { + ++index; + // write 0 at string end for proper printout + mb_i->wr_ptr()[0] = '\0'; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s%d = %s\n"), + ACE_TEXT ("message_block, part "), + index, + mb_i->rd_ptr ())); + } +#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ + // write 0 at string end for proper printout + mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %s\n"), + ACE_TEXT ("message_block"), + mb.rd_ptr ())); +#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** end of message ****************\n"))); + } + else if (result.error () != 0) + { + ACE_Log_Priority prio; +#if defined (ACE_WIN32) + if (result.error () == ERROR_OPERATION_ABORTED) + prio = LM_DEBUG; +#else + if (result.error () == ECANCELED) + prio = LM_DEBUG; +#endif /* ACE_WIN32 */ + else + prio = LM_ERROR; + ACE_Log_Msg::instance ()->errnum (result.error ()); + ACE_Log_Msg::instance ()->log (prio, + ACE_TEXT ("(%t) Sender %d; %p\n"), + this->index_, + ACE_TEXT ("read")); + } + else if (loglevel > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d: read %d bytes ok\n"), + this->index_, + result.bytes_transferred ())); + } + + mb.release (); + + if (result.error () == 0 && result.bytes_transferred () > 0) + { + this->total_rcv_ += result.bytes_transferred (); + + if (duplex != 0 || this->stop_writing_) // full duplex, continue read + this->initiate_read_stream (); + else // half-duplex write, after write we will start read + this->initiate_write_stream (); + } + + this->io_count_--; + if (this->io_count_ > 0) + return; + } + delete this; +} + +// ************************************************************* +// Configuration helpers +// ************************************************************* +int +print_usage (int /* argc */, ACE_TCHAR *argv[]) +{ + ACE_ERROR + ((LM_ERROR, + ACE_TEXT ("\nusage: %s") + ACE_TEXT ("\n-o <max number of started aio operations for Proactor>") + ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:") + ACE_TEXT ("\n a AIOCB") + ACE_TEXT ("\n i SIG") + ACE_TEXT ("\n c CB") + ACE_TEXT ("\n s SUN") + ACE_TEXT ("\n d default") + ACE_TEXT ("\n-d <duplex mode 1-on/0-off>") + ACE_TEXT ("\n-h <host> for Sender mode") + ACE_TEXT ("\n-n <number threads for Proactor pool>") + ACE_TEXT ("\n-p <port to listen/connect>") + ACE_TEXT ("\n-s <number of sender's instances>") + ACE_TEXT ("\n-b run client and server at the same time") + ACE_TEXT ("\n f file") + ACE_TEXT ("\n c console") + ACE_TEXT ("\n-v log level") + ACE_TEXT ("\n 0 - log errors and highlights") + ACE_TEXT ("\n 1 - log level 0 plus progress information") + ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results") + ACE_TEXT ("\n-x max transfer byte count per Sender") + ACE_TEXT ("\n-u show this message") + ACE_TEXT ("\n"), + argv[0] + )); + return -1; +} + +static int +set_proactor_type (const ACE_TCHAR *ptype) +{ + if (!ptype) + return 0; + + switch (toupper (*ptype)) + { + case 'D': + proactor_type = DEFAULT; + return 1; + case 'A': + proactor_type = AIOCB; + return 1; + case 'I': + proactor_type = SIG; + return 1; +#if defined (sun) + case 'S': + proactor_type = SUN; + return 1; +#endif /* sun */ +#if defined (__sgi) + case 'C': + proactor_type = CB; + return 1; +#endif /* __sgi */ + default: + break; + } + return 0; +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + // First, set up all the defaults then let any args change them. + both = 1; // client and server simultaneosly + duplex = 1; // full duplex is on + host = ACE_IPV6_LOCALHOST; // server to connect (IPv6 localhost) + port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen + max_aio_operations = 512; // POSIX Proactor params + proactor_type = DEFAULT; // Proactor type = default + threads = 3; // size of Proactor thread pool + senders = 10; // number of senders + loglevel = 0; // log level : only errors and highlights + // Default transfer limit 50 messages per Sender + xfer_limit = 50 * ACE_OS::strlen (complete_message); + + if (argc == 1) // no arguments , so one button test + return 0; + + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:s:v:ub")); + int c; + + while ((c = get_opt ()) != EOF) + { + switch (c) + { + case 'x': // xfer limit + xfer_limit = ACE_static_cast (size_t, + ACE_OS::atoi (get_opt.opt_arg ())); + if (xfer_limit == 0) + xfer_limit = 1; // Bare minimum. + break; + case 'b': // both client and server + both = 1; + break; + case 'v': // log level + loglevel = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'd': // duplex + duplex = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'h': // host for sender + host = get_opt.opt_arg (); + break; + case 'p': // port number + port = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'n': // thread pool size + threads = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 's': // number of senders + senders = ACE_OS::atoi (get_opt.opt_arg ()); + if (senders > MAX_SENDERS) + senders = MAX_SENDERS; + break; + case 'o': // max number of aio for proactor + max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 't': // Proactor Type + if (set_proactor_type (get_opt.opt_arg ())) + break; + return print_usage (argc, argv); + case 'u': + default: + return print_usage (argc, argv); + } // switch + } // while + + if (proactor_type == SUN && threads > 1) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sun aiowait is not thread-safe; ") + ACE_TEXT ("changing to 1 thread\n"))); + threads = 1; + } + + return 0; +} + +int +run_main (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPv6")); + + if (::parse_args (argc, argv) == -1) + return -1; + +#if defined (ACE_HAS_IPV6) + disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); + disable_signal (SIGPIPE, SIGPIPE); + + MyTask task1; + Acceptor acceptor; + Connector connector; + + int rc = 0; + + if (task1.start (threads, + proactor_type, + max_aio_operations) == 0) + { + + if (both != 0 || host == 0) // Acceptor + { + if (acceptor.open (ACE_INET_Addr (port, "::"), 0, 1) == 0) + rc = 1; + } + + if (both != 0 || host != 0) + { + ACE_INET_Addr addr; + if (host == 0) + host = ACE_IPV6_LOCALHOST; + + if (addr.set (port, host) == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host)); + else + rc += connector.start (addr, senders); + } + } + + // Wait a couple of seconds to let things get going, then poll til + // all sessions are done. + ACE_OS::sleep (2); + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n"))); + while (acceptor.get_number_sessions () > 0 || + connector.get_number_sessions () > 0 ) + ACE_OS::sleep (1); + +#if 0 + // Cancel all pending AIO on Connector and Senders + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Cancel Connector/Senders: sessions_=%d\n"), + connector.get_number_sessions () + )); + connector.cancel_all (); +#endif + + //Cancel all pending AIO on Acceptor And Receivers + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Cancel Acceptor/Receivers:sessions_=%d\n"), + acceptor.get_number_sessions () + )); + acceptor.cancel_all (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Stop Thread Pool Task\n") + )); + task1.stop (); + + // As Proactor event loop now is inactive it is safe to destroy all + // Senders + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Stop Connector/Senders: sessions_=%d\n"), + connector.get_number_sessions () + )); + connector.stop (); + + // As Proactor event loop now is inactive it is safe to destroy all + // Receivers + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Stop Acceptor/Receivers:sessions_=%d\n"), + acceptor.get_number_sessions () + )); + acceptor.stop (); + + //Print statistic + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf (bufs, + ACE_SIZE_T_FORMAT_SPECIFIER + ACE_TEXT ("(%ld)"), + connector.get_total_snd (), + connector.get_total_w ()); + + ACE_OS::sprintf (bufr, + ACE_SIZE_T_FORMAT_SPECIFIER + ACE_TEXT ("(%ld)"), + connector.get_total_rcv (), + connector.get_total_r ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"), + bufs, + bufr)); + + ACE_OS::sprintf (bufs, + ACE_SIZE_T_FORMAT_SPECIFIER + ACE_TEXT ("(%ld)"), + acceptor.get_total_snd (), + acceptor.get_total_w ()); + + ACE_OS::sprintf (bufr, + ACE_SIZE_T_FORMAT_SPECIFIER + ACE_TEXT ("(%ld)"), + acceptor.get_total_rcv (), + acceptor.get_total_r ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"), + bufs, + bufr)); + +#endif /* ACE_HAS_IPV6 */ + ACE_END_TEST; + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Asynch_Acceptor<Receiver>; +template class ACE_Asynch_Connector<Sender>; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_Asynch_Acceptor<Receiver> +#pragma instantiate ACE_Asynch_Connector<Sender> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + +#else + +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPv6")); + + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n") + ACE_TEXT ("Proactor_Test_IPv6 will not be run."))); + + ACE_END_TEST; + + return 0; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS */ diff --git a/tests/SOCK_Send_Recv_Test_IPV6.cpp b/tests/SOCK_Send_Recv_Test_IPV6.cpp new file mode 100644 index 00000000000..4ad3a7f2132 --- /dev/null +++ b/tests/SOCK_Send_Recv_Test_IPV6.cpp @@ -0,0 +1,392 @@ +// $Id$ +// =========================================================================== +/** + * @file SOCK_Send_Recv_Test_IPv6.cpp + * + * @brief This is a test of the <ACE_SOCK>'s various send and receive + * methods. + + * The test forks two processes or spawns two threads (depending upon + * the platform) and then executes client and server allowing them to + * connect and exchange data in ways designed to exercise the send + * and recv functions. Right now, it primarily tests the iov-like + * send and recv functions, but others should be added to completely + * cover the possible scenarios. + * + * @author Steve Huston <shuston@riverace.com> + * Brian Buesker <bbuesker@qualcomm.com> + */ +// ============================================================================ + +#include "test_config.h" +#include "ace/OS_NS_sys_wait.h" +#include "ace/Thread.h" +#include "ace/Thread_Manager.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" + +// Change to non-zero if test fails +static int Test_Result = 0; + +#if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS) + +// In test 3, a large amount of data is sent. The purpose is to overflow the +// TCP send window, causing the sender to block (it's a send_n). This value +// is the amount to send. The assumption is that no implementation has a +// receive window larger than 128K bytes. If one is found, this is the place +// to change it. +// For some odd reason, NT will try to send a single large buffer, but not +// multiple smaller ones that add up to the large size. +const size_t Test3_Send_Size = 4*1024; +const size_t Test3_Loops = 10; +const size_t Test3_Total_Size = Test3_Send_Size * Test3_Loops; + + +static void * +client (void *arg) +{ + ACE_INET_Addr *remote_addr = ACE_reinterpret_cast (ACE_INET_Addr *, + arg); + ACE_INET_Addr server_addr (remote_addr->get_port_number (), + ACE_IPV6_LOCALHOST); + + ACE_SOCK_Stream cli_stream; + ACE_SOCK_Connector con; + ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Connecting to port %d\n"), + server_addr.get_port_number())); + + // Initiate connection with server; don't wait forever + if (con.connect (cli_stream, + server_addr, + &timeout) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n"), + ACE_TEXT ("connection failed"))); + Test_Result = 1; + return 0; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) connected to %s\n"), + ACE_TEXT_CHAR_TO_TCHAR(server_addr.get_host_name ()))); + + //******************* TEST 1 ****************************** + // + // Do a iovec sendv - send the 255 byte buffer in 5 chunks. The + // server will verify that the correct data is sent, and that there + // is no more and no less. + + u_char buffer[255]; + size_t i; + ssize_t len; + + // The server will verify that this data pattern gets there intact. + + for (i = 0; i < sizeof buffer; ++i) + buffer[i] = ACE_static_cast (u_char, i); + + iovec iov[5]; + + iov[0].iov_base = ACE_reinterpret_cast (char *, &buffer[0]); + iov[0].iov_len = 50; + + iov[1].iov_base = ACE_reinterpret_cast (char *, &buffer[50]); + iov[1].iov_len = 25; + + iov[2].iov_base = ACE_reinterpret_cast (char *, &buffer[75]); + iov[2].iov_len = 150; + + iov[3].iov_base = ACE_reinterpret_cast (char *, &buffer[225]); + iov[3].iov_len = 29; + + iov[4].iov_base = ACE_reinterpret_cast (char *, &buffer[254]); + iov[4].iov_len = 1; + + len = cli_stream.sendv (iov, 5); + if (len == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n"), + ACE_TEXT ("Test 1, sendv failed"))); + Test_Result = 1; + } + else + ACE_ASSERT (len == 255); + + //******************* TEST 2 ****************************** + // + // The same data is coming back - receive it using recv (size_t n, + // ...) and compare it to the original data. + + u_char buffer2[255]; + // Give it a chance to get here + ACE_OS::sleep (2); + len = cli_stream.recv (4, + buffer2, + 150, + &buffer2[150], + 105); + if (len != 255) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p; len is %d, but should be 255!\n"), + len)); + } + ACE_ASSERT (len == 255); + + for (i = 0; i < 255; i++) + if (buffer2[i] != buffer[i]) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Test 2, rcvd byte %d is %d, not %d\n"), + i, buffer2[i], buffer[i])); + Test_Result = 1; + } + + //******************* TEST 3 ****************************** + // + // Do a send_n of a large size. The receive should sleep some to + // cause the data reception to be delayed, which will fill up the + // TCP window and cause send_n to block at some point. The particular + // case this tests only needs to be exercised if the socket is + // non-blocking, so set that first. + + ssize_t sent; + char buff[Test3_Send_Size]; + ACE_ASSERT (cli_stream.enable (ACE_NONBLOCK) != -1); + for (i = 0; i < Test3_Loops; ++i) + { + errno = 0; + sent = cli_stream.send_n (buff, sizeof (buff)); + if (sent != sizeof (buff) && errno != 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Test 3, pass %d, sent %d, %p\n"), + i, sent, ACE_TEXT ("error"))); + Test_Result = 1; // Fail + } + } + + cli_stream.close (); + + return 0; +} + +static void * +server (void *arg) +{ + ACE_SOCK_Acceptor *peer_acceptor = (ACE_SOCK_Acceptor *) arg; + ACE_SOCK_Stream sock_str; + ACE_INET_Addr cli_addr; + ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT); + + // Accept the connection over which the stream tests will run. + // Don't lock up if client doesn't connect + if (peer_acceptor->accept (sock_str, + &cli_addr, + &timeout) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n"), + ACE_TEXT ("accept"))); + Test_Result = 1; + return 0; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) client %s connected from %d\n"), + ACE_TEXT_CHAR_TO_TCHAR(cli_addr.get_host_name ()), + cli_addr.get_port_number ())); + + //******************* TEST 1 ****************************** + // + // Do a iovec recvv - the client should send 255 bytes, which we + // will be detected and read into a ACE-allocated buffer. Use a 5 + // second timeout to give the client a chance to send it all. + + ACE_OS::sleep (5); + + iovec iov[3]; + u_char buffer[255]; + ssize_t len; + int i; + + iov[0].iov_base = ACE_reinterpret_cast (char *, &buffer[0]); + iov[0].iov_len = 75; + + iov[1].iov_base = ACE_reinterpret_cast (char *, &buffer[75]); + iov[1].iov_len = 100; + + iov[2].iov_base = ACE_reinterpret_cast (char *, &buffer[175]); + iov[2].iov_len = 80; + + len = sock_str.recvv_n (iov, 3); + if (len == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n"), + ACE_TEXT ("Test 1, recvv failed"))); + Test_Result = 1; + } + + ACE_ASSERT (len == 255); + for (i = 0; i < 255; i++) + if (buffer[i] != i) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Test 1, rcvd byte %d is %d, not %d\n"), + i, + buffer[i], + i)); + Test_Result = 1; + } + + //******************* TEST 2 ****************************** + // + // Send the buffer back, using send (size_t n, ...) in 3 pieces. + + len = sock_str.send (6, + buffer, + 42, + &buffer[42], + 189, + &buffer[231], + 24); + ACE_ASSERT (len == 255); + + //******************* TEST 3 ****************************** + // + // The sender is testing send_n to make sure it blocks if the TCP + // window fills. So sleep here for a bit to avoid getting the data + // yet. Then just read and empty out the received data. + ACE_OS::sleep (8); + // Keep reading until the peer closes. + sock_str.disable (ACE_NONBLOCK); + ssize_t got = 1; + size_t total_recv = 0; + while (got != 0) + { + errno = 0; + got = sock_str.recv (buffer, sizeof (buffer)); + if (got < 0) + break; + total_recv += got; + } + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Test 3 received %d bytes\n"), + total_recv)); + + if (total_recv == Test3_Total_Size) + { + if (got != 0 || errno != 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Test 3 final recv status %d, expected 0\n"), + got)); + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), + ACE_TEXT ("expected errno == 0, instead"))); + Test_Result = 1; // Fail + } + } + else + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Test 3 expected %d %p\n"), + Test3_Total_Size, ACE_TEXT ("bytes"))); + Test_Result = 1; + } + + sock_str.close(); + + return 0; +} + +#endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */ + +static void +spawn (void) +{ + // Acceptor + ACE_SOCK_Acceptor peer_acceptor; + + // Create a server address. + ACE_INET_Addr server_addr; + + // Bind listener to any port and then find out what the port was. + if (peer_acceptor.open (ACE_Addr::sap_any, 0, AF_INET6) == -1 + || peer_acceptor.get_local_addr (server_addr) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n"), + ACE_TEXT ("open"))); + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) starting server at port %d\n"), + server_addr.get_port_number ())); + +#if !defined (ACE_LACKS_FORK) + switch (ACE_OS::fork ("child")) + { + case -1: + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n%a"), + ACE_TEXT ("fork failed"), + 1)); + /* NOTREACHED */ + case 0: + client (&server_addr); + ACE_OS::exit (0); + /* NOTREACHED */ + default: + server (ACE_reinterpret_cast (void *, + &peer_acceptor)); + ACE_OS::wait (); + } +#elif defined (ACE_HAS_THREADS) + if (ACE_Thread_Manager::instance ()->spawn + (ACE_THR_FUNC (server), + ACE_reinterpret_cast (void *, &peer_acceptor), + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n%a"), + ACE_TEXT ("thread create failed"), + 1)); + + if (ACE_Thread_Manager::instance ()->spawn + (ACE_THR_FUNC (client), + ACE_reinterpret_cast (void *, &server_addr), + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n%a"), + ACE_TEXT ("thread create failed"), + 1)); + + // Wait for the threads to exit. + ACE_Thread_Manager::instance ()->wait (); +#else + ACE_ERROR ((LM_INFO, + ACE_TEXT ("(%P|%t) ") + ACE_TEXT ("only one thread may be run ") + ACE_TEXT ("in a process on this platform\n"))); +#endif /* ACE_HAS_THREADS */ + + peer_acceptor.close (); + } +} + +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("SOCK_Send_Recv_Test_IPv6")); + +#if defined (ACE_HAS_IPV6) + spawn (); +#endif /* ACE_HAS_IPV6 */ + + ACE_END_TEST; + return Test_Result; +} diff --git a/tests/SOCK_Test_IPv6.cpp b/tests/SOCK_Test_IPv6.cpp new file mode 100644 index 00000000000..1ebd30f8721 --- /dev/null +++ b/tests/SOCK_Test_IPv6.cpp @@ -0,0 +1,281 @@ +// $Id$ +// ============================================================================ +/** + * @file SOCK_Test_IPv6.cpp + * + * @brief This is a test of the <ACE_SOCK_Acceptor> and + * <ACE_SOCK_Connector> classes. + * + * The test forks two processes or spawns two threads (depending upon + * the platform) and then executes client and server allowing them to + * connect and exchange data. + * + * @author Prashant Jain <pjain@cs.wustl.edu> + * Doug Schmidt <schmidt@cs.wustl.edu> + * Brian Buesker <bbuesker@qualcomm.com> + */ +// ============================================================================ + +#include "test_config.h" +#include "ace/OS_NS_sys_select.h" +#include "ace/OS_NS_sys_wait.h" +#include "ace/Thread.h" +#include "ace/Thread_Manager.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Handle_Set.h" + +static const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz"; + +static void * +client (void *arg) +{ + ACE_INET_Addr *remote_addr = (ACE_INET_Addr *) arg; + ACE_INET_Addr server_addr (remote_addr->get_port_number (), + ACE_IPV6_LOCALHOST); + ACE_SOCK_Stream cli_stream; + ACE_SOCK_Connector con; + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) starting non-blocking connect\n"))); + // Initiate timed, non-blocking connection with server. + + // Attempt a non-blocking connect to the server. + if (con.connect (cli_stream, server_addr, + (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + { + if (errno != EWOULDBLOCK) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("connection failed"))); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) starting timed connect\n"))); + + // Check if non-blocking connection is in progress, + // and wait up to ACE_DEFAULT_TIMEOUT seconds for it to complete. + ACE_Time_Value tv (ACE_DEFAULT_TIMEOUT); + + if (con.complete (cli_stream, &server_addr, &tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("connection failed")), 0); + else + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) connected to %s\n"), + ACE_TEXT_CHAR_TO_TCHAR(server_addr.get_host_name ()))); + } + + if (cli_stream.disable (ACE_NONBLOCK) == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("disable"))); + + // Send data to server (correctly handles "incomplete writes"). + + for (const char *c = ACE_ALPHABET; *c != '\0'; c++) + if (cli_stream.send_n (c, 1) == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("send_n"))); + + // Explicitly close the writer-side of the connection. + if (cli_stream.close_writer () == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("close_writer"))); + + char buf[1]; + + // Wait for handshake with server. + if (cli_stream.recv_n (buf, 1) != 1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("recv_n"))); + + // Close the connection completely. + if (cli_stream.close () == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("close"))); + + return 0; +} + +static void * +server (void *arg) +{ + ACE_SOCK_Acceptor *peer_acceptor = (ACE_SOCK_Acceptor *) arg; + + if (peer_acceptor->enable (ACE_NONBLOCK) == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("enable"))); + + // Keep these objects out here to prevent excessive constructor + // calls... + ACE_SOCK_Stream new_stream; + ACE_INET_Addr cli_addr; + ACE_Handle_Set handle_set; + const ACE_Time_Value def_timeout (ACE_DEFAULT_TIMEOUT); + ACE_Time_Value tv (def_timeout); + + char buf[BUFSIZ]; + const char *t = ACE_ALPHABET; + + handle_set.reset (); + handle_set.set_bit (peer_acceptor->get_handle ()); + + int select_width; +# if defined (ACE_WIN64) + // This arg is ignored on Windows and causes pointer truncation + // warnings on 64-bit compiles. + select_width = 0; +# else + select_width = int (peer_acceptor->get_handle ()) + 1; +# endif /* ACE_WIN64 */ + int result = ACE_OS::select (select_width, + handle_set, + 0, 0, &tv); + ACE_ASSERT (tv == def_timeout); + + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("select")), 0); + else if (result == 0) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) select timed out, shutting down\n"))); + return 0; + } + + // Create a new ACE_SOCK_Stream endpoint (note automatic restart + // if errno == EINTR). + + while ((result = peer_acceptor->accept (new_stream, &cli_addr)) != -1) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) client %s connected from %d\n"), + ACE_TEXT_CHAR_TO_TCHAR(cli_addr.get_host_name ()), cli_addr.get_port_number ())); + + // Enable non-blocking I/O. + if (new_stream.enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("enable")), 0); + + handle_set.reset (); + handle_set.set_bit (new_stream.get_handle ()); + + // Read data from client (terminate on error). + int select_width; + for (ssize_t r_bytes; ;) + { +# if defined (ACE_WIN64) + // This arg is ignored on Windows and causes pointer truncation + // warnings on 64-bit compiles. + select_width = 0; +# else + select_width = int (new_stream.get_handle ()) + 1; +# endif /* ACE_WIN64 */ + if (ACE_OS::select (select_width, + handle_set, + 0, 0, 0) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("select")), 0); + + while ((r_bytes = new_stream.recv (buf, 1)) > 0) + { + ACE_ASSERT (*t == buf[0]); + t++; + } + + if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) reached end of input, connection closed by client\n"))); + + // Handshake back with client. + if (new_stream.send_n ("", 1) != 1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("send_n"))); + + // Close endpoint. + if (new_stream.close () == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("close"))); + return 0; + } + else if (r_bytes == -1) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) no input available, going back to reading\n"))); + else + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("recv_n")), 0); + } + } + } + + if (result == -1) + { + if (errno == EWOULDBLOCK) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) no connections available, shutting down\n"))); + else + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("accept"))); + } + + return 0; +} + +static void +spawn (void) +{ + // Acceptor + ACE_SOCK_Acceptor peer_acceptor; + + // Create a server address. + ACE_INET_Addr server_addr; + + // Bind listener to any port and then find out what the port was. + if (peer_acceptor.open (ACE_Addr::sap_any, 0, AF_INET6) == -1 + || peer_acceptor.get_local_addr (server_addr) == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("open"))); + else + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) starting server at port %d\n"), + server_addr.get_port_number ())); + +#if !defined (ACE_LACKS_FORK) + switch (ACE_OS::fork ("child")) + { + case -1: + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n%a"), + ACE_TEXT ("fork failed"), + 1)); + /* NOTREACHED */ + case 0: + client (&server_addr); + exit (0); + /* NOTREACHED */ + default: + server ((void *) &peer_acceptor); + ACE_OS::wait (); + } +#elif defined (ACE_HAS_THREADS) + if (ACE_Thread_Manager::instance ()->spawn + (ACE_THR_FUNC (server), + (void *) &peer_acceptor, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n%a"), + ACE_TEXT ("thread create failed"), + 1)); + + if (ACE_Thread_Manager::instance ()->spawn + (ACE_THR_FUNC (client), + (void *) &server_addr, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) %p\n%a"), + ACE_TEXT ("thread create failed"), + 1)); + + // Wait for the threads to exit. + ACE_Thread_Manager::instance ()->wait (); +#else + ACE_ERROR ((LM_INFO, + ACE_TEXT ("(%P|%t) ") + ACE_TEXT ("only one thread may be run ") + ACE_TEXT ("in a process on this platform\n"))); +#endif /* ACE_HAS_THREADS */ + + peer_acceptor.close (); + } +} + +int +run_main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("SOCK_Test_IPv6")); + +#if defined (ACE_HAS_IPV6) + spawn (); +#endif /* ACE_HAS_IPV6 */ + + ACE_END_TEST; + return 0; +} diff --git a/tests/tests.mpc b/tests/tests.mpc index cc9fcc318f0..5c16d10a432 100644 --- a/tests/tests.mpc +++ b/tests/tests.mpc @@ -713,6 +713,13 @@ project(SOCK Test) : acetest { } } +project(SOCK Dgram Test) : acetest { + exename = SOCK_Dgram_Test + Source_Files { + SOCK_Dgram_Test.cpp + } +} + project(SOCK Connector Test) : acetest { exename = SOCK_Connector_Test Source_Files { |