diff options
-rw-r--r-- | ChangeLog | 45 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 45 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 45 | ||||
-rw-r--r-- | THANKS | 3 | ||||
-rw-r--r-- | ace/Configuration_Import_Export.cpp | 2 | ||||
-rw-r--r-- | ace/Strategies_T.cpp | 2 | ||||
-rw-r--r-- | tests/Makefile | 1 | ||||
-rw-r--r-- | tests/Makefile.bor | 1 | ||||
-rw-r--r-- | tests/Proactor_Scatter_Gather_Test.cpp | 1445 | ||||
-rw-r--r-- | tests/Proactor_Test.cpp | 2 | ||||
-rw-r--r-- | tests/run_test.lst | 1 |
11 files changed, 1579 insertions, 13 deletions
diff --git a/ChangeLog b/ChangeLog index d7af6d45f21..9e289a308e6 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,41 @@ +Sat May 25 16:42:46 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * tests/run_test.lst, + * tests/Makefile.bor, + * tests/Makefile: Added Proactor_Scatter_Gather_Test. + +Sat May 25 16:30:00 2002 Edan Ayal <edanayal@yahoo.com> + + * tests/Proactor_Scatter_Gather_Test.cpp: + Added a new test for the asynch scatter/gather I/O + functionality. It is currently supported (as the feature itself) + only under Win32 - actually NT4 SP2 and above. The test runs in + a single thread, and involves a single Sender, two Receivers and + a single Writer. The Sender async-reads (scattered) from a file + into chunks of <page size>. It async-sends (gathered) the odd + chunks to the first receiver over a stream, and the even chunks + to the second receiver over a different stream. The receivers + async-read (scattered) from the socket streams into chunks in + size of <page size>, and convey the data to the Writer. The + Writer reconstructs the file using async-write + (gathered). Finally, the reconstructed file is compared to the + original file to determine test success. The test therefore + covers both async scatter/gather stream I/O and async + scatter/gather file I/O. + +Sat May 25 14:52:06 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * ace/Strategies_T.cpp (make_svc_handler): Fixed a typo where + dll_name_ was misspelled as shared_library_. Thanks to Nathan + Krasney <natan-k@actcom.co.il> for reporting this. + +Fri May 24 10:42:25 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * ace/Configuration_Import_Export.cpp (squish): Added a + check for '\n' to ensure that empty lines are handled properly. + Thanks to Vladimir Chovanec <Vladimir.CHOVANEC@asset.sk> for + reporting this. + Sat May 25 15:09:42 2002 Steve Huston <shuston@riverace.com> * examples/C++NPv2/SLDex.{cpp mak}: @@ -79,7 +117,7 @@ Fri May 24 10:18:39 2002 Steve Huston <shuston@riverace.com> * ace/CDR_Stream.h (ACE_InputCDR): Added comments clarifying the behavior of the ctor that takes a const ACE_Message_Block *. - Thanks to Alexander Maack for motivating this. + Thanks to Alexander Maack for motivating this. * examples/C++NPv2/display_logfile.cpp (svc): Fix memory leak (thanks to Alexander Maack for reporting this) and don't lose @@ -117,8 +155,9 @@ Mon May 13 19:15:49 2002 Steve Huston <shuston@riverace.com> * netsvcs/servers/main.cpp: Move the reactor/signal initialization to be after the service loading. If the user specifies -b (be a - daemon) it closes all handles, including the reactor notification - pipe. Also added ACE_TEXT decorator to the string literals. + daemon) it closes all handles, including the reactor + notification pipe. Also added ACE_TEXT decorator to the string + literals. Fri May 24 13:29:05 UTC 2002 Craig Rodrigues <crodrigu@bbn.com> diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index d7af6d45f21..9e289a308e6 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,41 @@ +Sat May 25 16:42:46 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * tests/run_test.lst, + * tests/Makefile.bor, + * tests/Makefile: Added Proactor_Scatter_Gather_Test. + +Sat May 25 16:30:00 2002 Edan Ayal <edanayal@yahoo.com> + + * tests/Proactor_Scatter_Gather_Test.cpp: + Added a new test for the asynch scatter/gather I/O + functionality. It is currently supported (as the feature itself) + only under Win32 - actually NT4 SP2 and above. The test runs in + a single thread, and involves a single Sender, two Receivers and + a single Writer. The Sender async-reads (scattered) from a file + into chunks of <page size>. It async-sends (gathered) the odd + chunks to the first receiver over a stream, and the even chunks + to the second receiver over a different stream. The receivers + async-read (scattered) from the socket streams into chunks in + size of <page size>, and convey the data to the Writer. The + Writer reconstructs the file using async-write + (gathered). Finally, the reconstructed file is compared to the + original file to determine test success. The test therefore + covers both async scatter/gather stream I/O and async + scatter/gather file I/O. + +Sat May 25 14:52:06 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * ace/Strategies_T.cpp (make_svc_handler): Fixed a typo where + dll_name_ was misspelled as shared_library_. Thanks to Nathan + Krasney <natan-k@actcom.co.il> for reporting this. + +Fri May 24 10:42:25 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * ace/Configuration_Import_Export.cpp (squish): Added a + check for '\n' to ensure that empty lines are handled properly. + Thanks to Vladimir Chovanec <Vladimir.CHOVANEC@asset.sk> for + reporting this. + Sat May 25 15:09:42 2002 Steve Huston <shuston@riverace.com> * examples/C++NPv2/SLDex.{cpp mak}: @@ -79,7 +117,7 @@ Fri May 24 10:18:39 2002 Steve Huston <shuston@riverace.com> * ace/CDR_Stream.h (ACE_InputCDR): Added comments clarifying the behavior of the ctor that takes a const ACE_Message_Block *. - Thanks to Alexander Maack for motivating this. + Thanks to Alexander Maack for motivating this. * examples/C++NPv2/display_logfile.cpp (svc): Fix memory leak (thanks to Alexander Maack for reporting this) and don't lose @@ -117,8 +155,9 @@ Mon May 13 19:15:49 2002 Steve Huston <shuston@riverace.com> * netsvcs/servers/main.cpp: Move the reactor/signal initialization to be after the service loading. If the user specifies -b (be a - daemon) it closes all handles, including the reactor notification - pipe. Also added ACE_TEXT decorator to the string literals. + daemon) it closes all handles, including the reactor + notification pipe. Also added ACE_TEXT decorator to the string + literals. Fri May 24 13:29:05 UTC 2002 Craig Rodrigues <crodrigu@bbn.com> diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index d7af6d45f21..9e289a308e6 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,41 @@ +Sat May 25 16:42:46 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * tests/run_test.lst, + * tests/Makefile.bor, + * tests/Makefile: Added Proactor_Scatter_Gather_Test. + +Sat May 25 16:30:00 2002 Edan Ayal <edanayal@yahoo.com> + + * tests/Proactor_Scatter_Gather_Test.cpp: + Added a new test for the asynch scatter/gather I/O + functionality. It is currently supported (as the feature itself) + only under Win32 - actually NT4 SP2 and above. The test runs in + a single thread, and involves a single Sender, two Receivers and + a single Writer. The Sender async-reads (scattered) from a file + into chunks of <page size>. It async-sends (gathered) the odd + chunks to the first receiver over a stream, and the even chunks + to the second receiver over a different stream. The receivers + async-read (scattered) from the socket streams into chunks in + size of <page size>, and convey the data to the Writer. The + Writer reconstructs the file using async-write + (gathered). Finally, the reconstructed file is compared to the + original file to determine test success. The test therefore + covers both async scatter/gather stream I/O and async + scatter/gather file I/O. + +Sat May 25 14:52:06 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * ace/Strategies_T.cpp (make_svc_handler): Fixed a typo where + dll_name_ was misspelled as shared_library_. Thanks to Nathan + Krasney <natan-k@actcom.co.il> for reporting this. + +Fri May 24 10:42:25 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> + + * ace/Configuration_Import_Export.cpp (squish): Added a + check for '\n' to ensure that empty lines are handled properly. + Thanks to Vladimir Chovanec <Vladimir.CHOVANEC@asset.sk> for + reporting this. + Sat May 25 15:09:42 2002 Steve Huston <shuston@riverace.com> * examples/C++NPv2/SLDex.{cpp mak}: @@ -79,7 +117,7 @@ Fri May 24 10:18:39 2002 Steve Huston <shuston@riverace.com> * ace/CDR_Stream.h (ACE_InputCDR): Added comments clarifying the behavior of the ctor that takes a const ACE_Message_Block *. - Thanks to Alexander Maack for motivating this. + Thanks to Alexander Maack for motivating this. * examples/C++NPv2/display_logfile.cpp (svc): Fix memory leak (thanks to Alexander Maack for reporting this) and don't lose @@ -117,8 +155,9 @@ Mon May 13 19:15:49 2002 Steve Huston <shuston@riverace.com> * netsvcs/servers/main.cpp: Move the reactor/signal initialization to be after the service loading. If the user specifies -b (be a - daemon) it closes all handles, including the reactor notification - pipe. Also added ACE_TEXT decorator to the string literals. + daemon) it closes all handles, including the reactor + notification pipe. Also added ACE_TEXT decorator to the string + literals. Fri May 24 13:29:05 UTC 2002 Craig Rodrigues <crodrigu@bbn.com> @@ -1334,7 +1334,7 @@ Olof Lindfors <olof.lindfors@protegrity.com> Tom Wagner <TomW@CoManage.net> Kyle Brost <Kyle.Brost@quest.com> Nicolas Vincent <Vincent.Nicolas@Radiometer.dk> -Jonathan Wackley <jwackley@legato.com> +Jonathan Wackley <jwackley@mountaincable.com> Jan Kalin <jan.kalin@zag.si> Andreas Huggel <huggel_andreas@isoftel.com> Alain Totouom <atotouom@gmx.de> @@ -1520,6 +1520,7 @@ Bill Tonseth <wtonseth@yahoo.com> Frank Pilhofer <fp@fpx.de> Eric Quere <Eric.Quere@med.ge.com> Keith Thornton <keith.thornton.kt@germany.agfa.com> +Nathan Krasney <natan-k@actcom.co.il> I would particularly like to thank Paul Stephenson, who worked with me at Ericsson in the early 1990's. Paul devised the recursive Makefile diff --git a/ace/Configuration_Import_Export.cpp b/ace/Configuration_Import_Export.cpp index 8e97896e2ea..8c24242ae01 100644 --- a/ace/Configuration_Import_Export.cpp +++ b/ace/Configuration_Import_Export.cpp @@ -598,7 +598,7 @@ ACE_Ini_ImpExp::squish (ACE_TCHAR *src) // Now start at the beginning and move over all whitespace. for (cp = src; - (*cp != '\0') && ((*cp == ' ') || (*cp == '\t')); + (*cp != '\0') && ((*cp == ' ') || (*cp == '\t') || (*cp == '\n')); cp++) continue; diff --git a/ace/Strategies_T.cpp b/ace/Strategies_T.cpp index 949e7a9e524..f8c3052e2f2 100644 --- a/ace/Strategies_T.cpp +++ b/ace/Strategies_T.cpp @@ -72,7 +72,7 @@ ACE_DLL_Strategy<SVC_HANDLER>::make_svc_handler (SVC_HANDLER *&sh) ACE_TRACE ("ACE_DLL_Strategy<SVC_HANDLER>::make_svc_handler"); // Open the shared library. - ACE_SHLIB_HANDLE handle = ACE_OS::dlopen (this->shared_library_); + ACE_SHLIB_HANDLE handle = ACE_OS::dlopen (this->dll_name_); // Extract the factory function. SVC_HANDLER *(*factory)(void) = diff --git a/tests/Makefile b/tests/Makefile index c5f188198ed..963e5d58979 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -66,6 +66,7 @@ BIN = Aio_Platform_Test \ Object_Manager_Test \ OrdMultiSet_Test \ OS_Test \ + Proactor_Scatter_Gather_Test \ Proactor_Test \ Proactor_Timer_Test \ Process_Mutex_Test \ diff --git a/tests/Makefile.bor b/tests/Makefile.bor index dcf8377a9ab..973aea332af 100644 --- a/tests/Makefile.bor +++ b/tests/Makefile.bor @@ -66,6 +66,7 @@ NAMES = \ OrdMultiSet_Test \ OS_Test \ Pipe_Test \ + Proactor_Scatter_Gather_Test \ Proactor_Test \ Proactor_Timer_Test \ Priority_Buffer_Test \ diff --git a/tests/Proactor_Scatter_Gather_Test.cpp b/tests/Proactor_Scatter_Gather_Test.cpp new file mode 100644 index 00000000000..556ffe79f7c --- /dev/null +++ b/tests/Proactor_Scatter_Gather_Test.cpp @@ -0,0 +1,1445 @@ +// $Id$ + +// ============================================================================ +/** + * @file Proactor_Scatter_Gather_Test.cpp + * + * The test runs on a single thread, and involves a single Sender, + * two Receivers and a single Writer. The Sender async-reads + * (scattered) from a file into chunks of <page size>. It + * async-sends (gathered) the odd chunks to the first receiver over a + * stream, and the even chunks to the second receiver over a + * different stream. The receivers async-read (scattered) from the + * socket streams into chunks in size of <page size>, and convey the + * data to the Writer. The Writer reconstructs the file using + * async-write (gathered). Then, the reconstructed file is compared + * to the original file to determine test success. So, It covers both + * async scatter/gather stream I/O and async scatter/gather file I/O. + * The wire transfer protocol is very naive (and totally non + * reliable...) - when both connections are closed, EOF is assumed. + * The test can be also run in a seperated sender and receiver mode, + * to test real network influences. + * + * This test is based upon some building blocks taken from the + * Proactor_Test.cpp. + * + * @author Edan Ayal <edanayal@yahoo.com> */ +// ============================================================================ + +#include "test_config.h" + +#if ((defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && !defined (ACE_HAS_WINCE)) + // This currently only works on Win32 platforms (NT SP2 and above). + // Support for Unix platforms supporting POSIX aio calls should be added in future. + +#include "ace/Get_Opt.h" + +#include "ace/Proactor.h" +#include "ace/Asynch_Acceptor.h" +#include "ace/Asynch_Connector.h" + +#include "ace/SOCK_Connector.h" + +// For the Acceptor/Connector handlers maintenance lists +static const int SENDERS = 1; +static const int RECEIVERS = 2; + +// Port that we're receiving connections on. +static u_short port = ACE_DEFAULT_SERVER_PORT; + +static ACE_TCHAR *host = ACE_LOCALHOST; + +// File that we're sending. +static ACE_TCHAR *input_file = ACE_TEXT("Proactor_Scatter_Gather_Test.cpp"); + +// Name of the output file. +static ACE_TCHAR *output_file = ACE_TEXT("output"); + +static bool client_only = false; +static bool server_only = false; +static size_t chunk_size = 0; + +enum +{ + ODD = 0, + EVEN +}; + +// ************************************************************* +// Some chunks chain helper routines +// ************************************************************* +static int allocate_chunks_chain (ACE_Message_Block *&head_mb, + size_t number_of_chunks) +{ + ACE_Message_Block *pre_mb = 0; + + for (size_t index = 0; index < number_of_chunks; ++index) + { +#if defined (ACE_WIN32) + void *addr = ::VirtualAlloc (0, + chunk_size, + MEM_COMMIT, + PAGE_READWRITE); +#else + void *addr = new char[chunk_size]; +#endif /* ACE_WIN32 */ + if (addr) + { + ACE_Message_Block *mb = new ACE_Message_Block (ACE_static_cast (char *, addr), + chunk_size); + if (!head_mb) + head_mb = mb; + + // chain them together + if (pre_mb) + pre_mb->cont (mb); + pre_mb = mb; + } + else + { + ACE_ASSERT (0); + return -1; + } + } + + return 0; +} + +static void +free_chunks_chain (ACE_Message_Block *&mb) +{ + for (const ACE_Message_Block* msg = mb; + msg != 0; + msg = msg->cont ()) + { +#if defined (ACE_WIN32) + ::VirtualFree (msg->base (), + msg->size (), + MEM_DECOMMIT); +#else + delete [] msg->base (); +#endif /* ACE_WIN32 */ + } + + mb->release (); + mb = 0; +} + +static int +last_chunk (ACE_Message_Block *chain, + ACE_Message_Block *&last) +{ + if (!chain) + return 0; + + size_t index = 1; + last = chain; + while (0 != last->cont ()) + { + last = last->cont (); + ++index; + } + + return index; +} + +static void +merge_odd_even_chains (ACE_Message_Block *odd_mb, + ACE_Message_Block *even_mb) +{ + ACE_Message_Block *pre_pre_mb = odd_mb; + ACE_Message_Block *pre_mb = even_mb; + ACE_Message_Block *curr_mb = odd_mb->cont (); + + if (even_mb) + { + for (; curr_mb != 0; curr_mb = pre_pre_mb->cont ()) + { + pre_pre_mb->cont (pre_mb); + + // increment history pointers + pre_pre_mb = pre_mb; + pre_mb = curr_mb; + } + + pre_pre_mb->cont (pre_mb); + pre_mb->cont (0); + } +} + +static void +split_odd_even_chains (ACE_Message_Block *odd_mb, + ACE_Message_Block *even_mb) +{ + ACE_Message_Block *pre_pre_mb = odd_mb; + ACE_Message_Block *pre_mb = even_mb; + ACE_Message_Block *curr_mb = (even_mb ? even_mb->cont () : 0); + + for (; curr_mb != 0; curr_mb = curr_mb->cont ()) + { + pre_pre_mb->cont (curr_mb); + + // increment history pointers + pre_pre_mb = pre_mb; + pre_mb = curr_mb; + } + + pre_pre_mb->cont (0); + if (pre_mb) + pre_mb->cont (0); +} + +static void +add_to_chunks_chain (ACE_Message_Block *&chunks_chain, + ACE_Message_Block *additional_chunks_chain) +{ + if (0 == chunks_chain) + chunks_chain = additional_chunks_chain; + else + { + ACE_Message_Block *last = 0; + last_chunk (chunks_chain, last); + if (last) + last->cont (additional_chunks_chain); + } +} + +static void +remove_empty_chunks (ACE_Message_Block *&chunks_chain) +{ + if (0 == chunks_chain) + return; + + ACE_Message_Block *first_empty = chunks_chain; + ACE_Message_Block *pre_mb = 0; + + while (first_empty->length () > 0 && + 0 != first_empty->cont ()) + { + pre_mb = first_empty; + first_empty = first_empty->cont (); + } + + // break the chain there, and release the empty end (might be everything) + if (0 == first_empty->length ()) + { + if (pre_mb) // might be 0, in case it's the entire chain + pre_mb->cont (0); + + if (first_empty == chunks_chain) + chunks_chain = 0; + + free_chunks_chain (first_empty); + } +} + +// ************************************************************* +// Acceptor, Receiver and Writer +// ************************************************************* +class Receiver; + +class Acceptor : public ACE_Asynch_Acceptor<Receiver> +{ + friend class Receiver; + +public: + Acceptor (void); + virtual ~Acceptor (void); + + void stop (void); + + // Virtual from ACE_Asynch_Acceptor + virtual Receiver *make_handler (void); + + int get_number_sessions (void) { return this->sessions_; } + +private: + void on_new_receiver (Receiver &rcvr); + void on_delete_receiver (Receiver &rcvr); + + int sessions_; + Receiver *list_receivers_[RECEIVERS]; +}; + +class Writer; + +// The first instantiated take the role of the odd receiver +class Receiver : public ACE_Service_Handler +{ + friend class Acceptor; + friend class Writer; + +public: + Receiver (Acceptor *acceptor = 0, int index = -1); + virtual ~Receiver (void); + + /// This is called after the new connection has been accepted. + virtual void open (ACE_HANDLE handle, + ACE_Message_Block &message_block); +protected: + /// This is called by the framework when asynchronous <read> operation from the + /// socket completes. + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + +private: + int initiate_read_stream (void); + + void check_destroy (void); + + Acceptor *acceptor_; + int index_; + + // Socket input + ACE_Asynch_Read_Stream rs_; + ACE_HANDLE socket_handle_; + + // Writer + static Writer* writer_; + + long io_count_; + + char odd_; + + // if we get non-page-size reminder, we will not send it to the writer + // until it is full (unless at end) + ACE_Message_Block *partial_chunk_; +}; + +class Writer : public ACE_Service_Handler +{ + friend class Receiver; + +public: + Writer (void); + virtual ~Writer (void); + + void open (void); + + // this is *not* a callback from the framework + int handle_read_chunks_chain (ACE_Message_Block *mb, + int type); + + // for determining when last receiver dies + void on_new_receiver (); + void on_delete_receiver (); + +protected: + /// This is called by the framework when an asynchronous <write> to the file + /// completes. + virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result); + +private: + int initiate_write_file (void); + +private: + // Output file + ACE_Asynch_Write_File wf_; + ACE_HANDLE output_file_handle_; + size_t writing_file_offset_; + size_t reported_file_offset_; + ACE_Message_Block *odd_chain_; + ACE_Message_Block *even_chain_; + long io_count_; + char receiver_count_; +}; + +// ************************************************************* +// Receiver Impl +// ************************************************************* + +Writer *Receiver::writer_ = 0; + +Receiver::Receiver (Acceptor * acceptor, int index) + : acceptor_ (acceptor), + index_ (index), + socket_handle_ (ACE_INVALID_HANDLE), + io_count_ (0), + partial_chunk_ (0) +{ + // the first one is the odd one + this->odd_ = ((0 == index) ? 1 : 0); + + if (this->odd_) + { + Receiver::writer_ = new Writer; + if (!Receiver::writer_) + { + ACE_ASSERT (0); + return; + } + } + + Receiver::writer_->on_new_receiver (); + + if (this->acceptor_ != 0) + this->acceptor_->on_new_receiver (*this); +} + +Receiver::~Receiver (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Receiver::~Receiver\n"))); + + if (this->acceptor_ != 0) + this->acceptor_->on_delete_receiver (*this); + + if (this->socket_handle_ != ACE_INVALID_HANDLE) + ACE_OS::closesocket (this->socket_handle_); + + Receiver::writer_->on_delete_receiver (); + + if (this->partial_chunk_) + { + ACE_ASSERT (0); // should not be getting here + this->partial_chunk_->release (); + } +} + +void +Receiver::check_destroy (void) +{ + if (this->io_count_ <= 0) + delete this; +} + +void +Receiver::open (ACE_HANDLE handle, ACE_Message_Block &) +{ + this->socket_handle_ = handle; + + // Open the ACE_Asynch_Read_Stream + if (this->rs_.open (*this, this->socket_handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open"))); + else + { + if (this->odd_) + Receiver::writer_->open (); + + this->initiate_read_stream (); + } + + this->check_destroy (); +} + +int +Receiver::initiate_read_stream (void) +{ + if (!Receiver::writer_) + return -1; + + // how many chunks to allocate? + size_t number_of_new_chunks = (this->partial_chunk_ ? + (ACE_IOV_MAX / RECEIVERS) - 1 + : ACE_IOV_MAX / RECEIVERS); + + // allocate chunks chain + ACE_Message_Block *head_mb = 0; + if (-1 == allocate_chunks_chain (head_mb, number_of_new_chunks)) + { + ACE_ASSERT (0); + return -1; + } + + // calculate how many bytes to read + + // head_mb could be 0 (no new chunks allocated) + size_t bytes_to_read = head_mb ? head_mb->total_size () : 0; + + // add the partial chunk at the front if appropriate, and update + // the number of bytes to read + if (this->partial_chunk_) + { + bytes_to_read += this->partial_chunk_->space (); + this->partial_chunk_->cont (head_mb); + head_mb = this->partial_chunk_; + this->partial_chunk_ = 0; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Receiver::initiate_read_stream - (%s) readv %d\n"), + this->odd_ ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"), + bytes_to_read)); + + // perform the actual scattered read + if (this->rs_.readv (*head_mb, + bytes_to_read) == -1) + { + free_chunks_chain (head_mb); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")), + -1); + } + + ++this->io_count_; + return 0; +} + +void +Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + ACE_Message_Block *mb = &result.message_block (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Receiver::handle_read_stream - (%s) read %d\n"), + this->odd_ ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"), + result.bytes_transferred ())); + + // Transfer only complete chunks to the writer. + // Save last partial chunk for the next call. + // On disconnect (error or 0 transferred), transfer whatever we have. + + // at this stage there should not be anything there + ACE_ASSERT (!this->partial_chunk_); + + // first, remove the empty chunks + remove_empty_chunks (mb); + + if (mb && Receiver::writer_) + { // there's something to write, and who to write to + + // write everything or only complete chunks? + + // write everything - when no new bytes were transferred + bool write_everything = !result.bytes_transferred (); + if (write_everything) + Receiver::writer_->handle_read_chunks_chain (mb, + this->odd_ ? ODD : EVEN); + else + { // filter out the partial chunk at the end (if present) + // and save it for later before writing the full chunks + + // have this->partial_chunk_ point to the last chunk in the chain + size_t last_index = last_chunk (mb, this->partial_chunk_); + if (this->partial_chunk_ && + this->partial_chunk_->length () < chunk_size) + { // found partial chunk at end of chain + // detach it from the chain + if (last_index > 1) // chain bigger than 1 + { + ACE_Message_Block *pre_last = mb; + for (size_t index = 1; index < last_index - 1; ++index) + pre_last = pre_last->cont (); + + // detach partial chunk from chain + pre_last->cont (0); + } + else + // chain in length of 1 - so we need to zero mb + mb = 0; + } + else // last is a full chunk, so hand it over with the rest + this->partial_chunk_ = 0; + + // transfer (if there's anything left) + if (mb && mb->total_length ()) + Receiver::writer_->handle_read_chunks_chain ( + mb, + this->odd_ ? ODD : EVEN); + + // initiate more reads only if no error + if (!result.error ()) + this->initiate_read_stream (); + else + ACE_ASSERT (0); + } + } + else if (mb && !Receiver::writer_) + // no one to write to + free_chunks_chain (mb); + + --this->io_count_; + + this->check_destroy (); +} + +// ************************************************************* +// Acceptor Impl +// ************************************************************* + +Acceptor::Acceptor (void) + : sessions_ (0) +{ + for (int i = 0; i < RECEIVERS; ++i) + this->list_receivers_[i] = 0; +} + +Acceptor::~Acceptor (void) +{ + this->stop (); +} + + +void +Acceptor::stop (void) +{ + // This method can be called only after proactor event loop is done + // in all threads. + for (int i = 0; i < RECEIVERS; ++i) + { + delete this->list_receivers_[i]; + this->list_receivers_[i] = 0; + } +} + +void +Acceptor::on_new_receiver (Receiver & rcvr) +{ + ++this->sessions_; + this->list_receivers_[rcvr.index_] = &rcvr; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"), + this->sessions_)); +} + +void +Acceptor::on_delete_receiver (Receiver & rcvr) +{ + --this->sessions_; + if (rcvr.index_ >= 0 + && rcvr.index_ < RECEIVERS + && this->list_receivers_[rcvr.index_] == &rcvr) + this->list_receivers_[rcvr.index_] = 0; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"), + this->sessions_)); +} + +Receiver * +Acceptor::make_handler (void) +{ + if (this->sessions_ >= RECEIVERS) + return 0; + + for (int i = 0; i < RECEIVERS; ++i) + { + if (this->list_receivers_[i] == 0) + { + ACE_NEW_RETURN (this->list_receivers_[i], + Receiver (this, i), + 0); + return this->list_receivers_[i]; + } + } + + return 0; +} + +// ************************************************************* +// Writer Impl +// ************************************************************* + +Writer::Writer (void) +: output_file_handle_ (ACE_INVALID_HANDLE), + writing_file_offset_ (0), + reported_file_offset_ (0), + odd_chain_ (0), + even_chain_ (0), + io_count_ (0), + receiver_count_ (0) +{ +} + +Writer::~Writer (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Writer::~Writer\n"))); + + if (this->output_file_handle_ != ACE_INVALID_HANDLE) + ACE_OS::close (this->output_file_handle_); + + Receiver::writer_ = 0; +} + +void +Writer::on_new_receiver () +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Writer::on_new_receiver\n"))); + + ++this->receiver_count_; +} + +void +Writer::on_delete_receiver () +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Writer::on_delete_receiver\n"))); + + --this->receiver_count_; + + if (0 == this->receiver_count_) + { + if (this->io_count_ <= 0) + // no pending io, so do the work oursleves + // (if pending io, they'll see the zero receiver count) + this->initiate_write_file (); + } +} + +void +Writer::open (void) +{ + // Open the file for output + if (ACE_INVALID_HANDLE == (this->output_file_handle_ = ACE_OS::open (output_file, + O_CREAT | _O_TRUNC | _O_WRONLY |\ + FILE_FLAG_OVERLAPPED |\ + FILE_FLAG_NO_BUFFERING, + ACE_DEFAULT_FILE_PERMS))) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Writer::open::ACE_OS::open"))); + // Open the ACE_Asynch_Write_File + else if (this->wf_.open (*this, this->output_file_handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Writer::open::ACE_Asynch_Write_File::open"))); +} + +int +Writer::handle_read_chunks_chain (ACE_Message_Block *mb, + int type) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Writer::handle_read_chunks_chain - (%s) %d bytes\n"), + (type == ODD) ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"), + mb->total_length ())); + + add_to_chunks_chain (ODD == type ? this->odd_chain_ : this->even_chain_, mb); + + this->initiate_write_file (); + + return 0; +} + +int +Writer::initiate_write_file (void) +{ + // find out how much can we merge + ACE_Message_Block *dummy_last = 0; + size_t odd_count = last_chunk (this->odd_chain_, dummy_last); + size_t even_count = last_chunk (this->even_chain_, dummy_last); + + size_t merge_size = ACE_MIN (ACE_MIN (odd_count, even_count), + (size_t) ACE_IOV_MAX); + + // the options here are as follows: + // io_count_ can be zero or greater. + // merge_size can be zero or not. + // if non zero merge, write the merge. ASSERT receiver_count_ is non zero too. + // if zero merge: + // if receiver_count_ is non zero, NOOP. + // if zero receiver_count_, we should write whatever is left, + // and terminate the writer at completion. + // if nothing to write, and io_count_ is zero too, terminate here. + + if (0 == merge_size && + 0 != this->receiver_count_) + return 0; + + if (0 == merge_size && + 0 == this->receiver_count_ && + 0 == odd_count && + 0 == even_count && + 0 == this->io_count_) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Writer::initiate_write_file") + ACE_TEXT (" - ending proactor event loop\n"))); + + ACE_Proactor::instance ()->end_event_loop (); + + delete this; + + return 0; + } + + // if we reached nere and merge_size is zero, we should write whatever is + // in the queues (1 to 2 chunks together), so let's force the merge size to 1. + if (0 == merge_size) + { + ACE_ASSERT (1 == odd_count && 1 >= even_count); + merge_size = 1; + } + + // Now that we found out what we want to do, prepare the chain + // that will be written, and update the remainders + ACE_Message_Block *new_odd_chain_head = this->odd_chain_; + ACE_Message_Block *new_even_chain_head = this->even_chain_; + + // locate the place for detachment in the chains + ACE_Message_Block *pre_odd = 0; + ACE_Message_Block *pre_even = 0; + for (size_t index = 0; index < merge_size; ++index) + { + pre_odd = new_odd_chain_head; + if (new_odd_chain_head) + new_odd_chain_head = new_odd_chain_head->cont (); + pre_even = new_even_chain_head; + if (new_even_chain_head) + new_even_chain_head = new_even_chain_head->cont (); + } + // now detach the chain + if (pre_odd) + pre_odd->cont (0); + if (pre_even) + pre_even->cont (0); + + // perform merge between the two chains + merge_odd_even_chains (this->odd_chain_, this->even_chain_); + + // and now finally perform the write + ACE_Message_Block *united_mb = this->odd_chain_; + size_t increment_writing_file_offset = united_mb->total_length (); + + // Reconstruct the file + // Write the size, not the length, because we must write in chunks of <page size> + if (this->wf_.writev (*united_mb, + united_mb->total_size (), + this->writing_file_offset_) == -1) + { + free_chunks_chain (united_mb); + + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Writer::initiate_write_file::ACE_Asynch_Write_Stream::writev")), + -1); + } + + // we update now because otherwise, we'd have error when performing + // pipelined writing (that is, mulitple calls to write before the callbacks + // to handle_x) + this->writing_file_offset_ += increment_writing_file_offset; + + // update the remainders of the chains + this->odd_chain_ = new_odd_chain_head; + this->even_chain_ = new_even_chain_head; + + ++this->io_count_; + return 0; +} + +void +Writer::handle_write_file (const ACE_Asynch_Write_File::Result &result) +{ + ACE_Message_Block *mb = &result.message_block (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Writer::handle_write_file - offset %d + %d\n"), + this->reported_file_offset_, + result.bytes_transferred ())); + + this->reported_file_offset_ += result.bytes_transferred (); + + // Always truncate as required, + // because partial will always be the last write to a file + ACE_Message_Block *last_mb = mb; + last_chunk (mb, last_mb); + + if (last_mb->space ()) + ACE_OS::truncate (output_file, this->reported_file_offset_ - last_mb->space ()); + + free_chunks_chain (mb); + + --this->io_count_; + + // end of process? + if (0 == this->receiver_count_ && + 0 == this->io_count_) + { + ACE_ASSERT (0 == this->odd_chain_ && 0 == this->even_chain_); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Writer::handle_write_file"), + ACE_TEXT (" - ending proactor event loop\n"))); + + ACE_Proactor::instance ()->end_event_loop (); + + delete this; + } +} + +// ************************************************************* +// Connector and Sender +// ************************************************************* +class Sender; + +class Connector : public ACE_Asynch_Connector<Sender> +{ + friend class Sender; + +public: + Connector (void); + virtual ~Connector (void); + + void stop (void); + + // Virtual from ACE_Asynch_Connector + virtual Sender *make_handler (void); + +private: + void on_new_sender (Sender &rcvr); + void on_delete_sender (Sender &rcvr); + + int sessions_; + Sender *list_senders_[SENDERS]; +}; + +class Sender : public ACE_Service_Handler +{ + friend class Connector; +public: + + Sender (Connector *connector = 0, int index = -1); + + virtual ~Sender (void); + + /// This is called after the new connection has been established. + virtual void open (ACE_HANDLE handle, + ACE_Message_Block &message_block); + + // This is called by the framework when asynchronous reads from the file complete + virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); + + // This is called by the framework when asynchronous writes from the socket complete + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + +private: + void check_destroy (void); + + int initiate_read_file (void); + + int initiate_write_stream (ACE_Message_Block &mb); + + int index_; + Connector * connector_; + + // File to read from + ACE_Asynch_Read_File rf_; + ACE_HANDLE input_file_handle_; + size_t file_offset_; + + // Sockets to send to + // odd and even socket output streams + ACE_Asynch_Write_Stream ws_[RECEIVERS]; + ACE_HANDLE socket_handle_[RECEIVERS]; + + long io_count_; +}; + +// ************************************************************* +// Connector Impl +// ************************************************************* + +Connector::Connector (void) + : sessions_ (0) +{ + for (int i = 0; i < SENDERS; ++i) + this->list_senders_[i] = 0; +} + +Connector::~Connector (void) +{ + this->stop (); +} + +void +Connector::stop (void) +{ + // This method can be called only after proactor event loop is done + // in all threads. + + for (int i = 0; i < SENDERS; ++i) + { + delete this->list_senders_[i]; + this->list_senders_[i] = 0; + } +} + +void +Connector::on_new_sender (Sender &sndr) +{ + ++this->sessions_; + this->list_senders_[sndr.index_] = &sndr; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sender::CTOR sessions_ = %d\n"), + this->sessions_)); +} + +void +Connector::on_delete_sender (Sender &sndr) +{ + --this->sessions_; + if (sndr.index_ >= 0 + && sndr.index_ < SENDERS + && this->list_senders_[sndr.index_] == &sndr) + this->list_senders_[sndr.index_] = 0; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sender::~DTOR sessions_ = %d\n"), + this->sessions_)); +} + +Sender * +Connector::make_handler (void) +{ + if (this->sessions_ >= SENDERS) + return 0; + + for (int i = 0; i < SENDERS; ++i) + { + if (this->list_senders_ [i] == 0) + { + ACE_NEW_RETURN (this->list_senders_[i], + Sender (this, i), + 0); + return this->list_senders_[i]; + } + } + + return 0; +} + +// ************************************************************* +// Sender Impl +// ************************************************************* + +Sender::Sender (Connector * connector, int index) + : index_ (index), + connector_ (connector), + input_file_handle_ (ACE_INVALID_HANDLE), + file_offset_ (0), + io_count_ (0) +{ + socket_handle_[ODD] = socket_handle_[EVEN] = ACE_INVALID_HANDLE; + + if (this->connector_ != 0) + this->connector_->on_new_sender (*this); +} + +Sender::~Sender (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sender::~Sender\n"))); + + if (this->connector_ != 0) + this->connector_->on_delete_sender (*this); + + if (this->socket_handle_[ODD] != ACE_INVALID_HANDLE) + ACE_OS::closesocket (this->socket_handle_[ODD]); + + if (this->socket_handle_[EVEN] != ACE_INVALID_HANDLE) + ACE_OS::closesocket (this->socket_handle_[EVEN]); + + if (this->input_file_handle_ != ACE_INVALID_HANDLE) + ACE_OS::close (this->input_file_handle_); + + if (client_only) + ACE_Proactor::instance ()->end_event_loop (); +} + +// return true if we alive, false we commited suicide +void +Sender::check_destroy (void) +{ + if (this->io_count_ <= 0) + delete this; +} + +void +Sender::open (ACE_HANDLE handle, ACE_Message_Block &) +{ + this->socket_handle_[ODD] = handle; + + // Open the input file + if (ACE_INVALID_HANDLE == (this->input_file_handle_ = ACE_OS::open (input_file, + _O_RDONLY |\ + FILE_FLAG_OVERLAPPED |\ + FILE_FLAG_NO_BUFFERING, + ACE_DEFAULT_FILE_PERMS))) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::open::ACE_OS::open"))); + } + else + { + // now connect (w/o the connector factory) to the even (=second) receiver: + // we don't connect thru the factory in order not to instantiate another Sender + ACE_SOCK_Connector sock_connector; + ACE_SOCK_Stream sock_stream; + if (-1 == sock_connector.connect (sock_stream, + ACE_INET_Addr (port, host))) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::open::ACE_SOCK_Connector::connect"))); + + else + { + this->socket_handle_[EVEN] = sock_stream.get_handle (); + + // Open odd ACE_Asynch_Write_Stream + if (this->ws_[ODD].open (*this, this->socket_handle_[ODD]) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open"))); + + // Open even ACE_Asynch_Write_Stream + else if (this->ws_[EVEN].open (*this, this->socket_handle_[EVEN]) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open"))); + + // Open ACE_Asynch_Read_File + else if (this->rf_.open (*this, this->input_file_handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::open::ACE_Asynch_Read_File::open"))); + else + // Start an asynchronous read + this->initiate_read_file (); + } + } + + this->check_destroy (); +} + +int +Sender::initiate_read_file (void) +{ + ACE_ASSERT (0 == this->file_offset_ % chunk_size); + + static const size_t file_size = ACE_OS::filesize (input_file); + + static const size_t number_of_chunks_needed_for_file + = ACE_OS::ceil ((double) file_size / chunk_size); + + size_t relevant_number_of_chunks = + ACE_MIN ((size_t)ACE_IOV_MAX, + number_of_chunks_needed_for_file - (this->file_offset_ / chunk_size)); + + if (!relevant_number_of_chunks) + { + ACE_ASSERT (0); // Just 2 C it coming + return 0; + } + + ACE_Message_Block *head_mb = 0; + if (-1 == allocate_chunks_chain (head_mb, relevant_number_of_chunks)) + { + ACE_ASSERT (0); + return -1; + } + + // Inititiate read + if (this->rf_.readv (*head_mb, + head_mb->total_size (), + this->file_offset_) == -1) + { + free_chunks_chain (head_mb); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::initiate_read_file::") + ACE_TEXT ("ACE_Asynch_Read_Stream::readv")), + -1); + } + + ++this->io_count_; + return 0; +} + +int +Sender::initiate_write_stream (ACE_Message_Block &mb) +{ + // send the odd to the first connection, and the even to the second + // connection. + + ACE_Message_Block *odd_mb = &mb; + ACE_Message_Block *even_mb = mb.cont (); + + split_odd_even_chains (odd_mb, even_mb); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sender::initiate_write_stream - (ODD ) writev %d\n"), + odd_mb->total_length ())); + + if (this->ws_[ODD].writev (*odd_mb, odd_mb->total_length ()) == -1) + { + free_chunks_chain (odd_mb); + + if (even_mb) + free_chunks_chain (even_mb); + + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")), + -1); + } + + ++this->io_count_; + + if (even_mb) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sender::initiate_write_stream - (EVEN) writev %d\n"), + even_mb->total_length ())); + + if (this->ws_[EVEN].writev (*even_mb, even_mb->total_length ()) == -1) + { + free_chunks_chain (even_mb); + + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")), + -1); + } + + ++this->io_count_; + } + + return 0; +} + +void +Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result) +{ + ACE_Message_Block *mb = &result.message_block (); + + if (result.error () == 0 && result.bytes_transferred () != 0) + { + size_t bytes_transferred = result.bytes_transferred (); + size_t chunks_chain_size = mb->total_size (); + + this->file_offset_ += bytes_transferred; + + this->initiate_write_stream (*mb); + + // and read more if required + if (bytes_transferred == chunks_chain_size) + this->initiate_read_file (); + } + else + free_chunks_chain (mb); + + --this->io_count_; + + this->check_destroy (); +} + +void +Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + ACE_Message_Block *mb = &result.message_block (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sender::handle_write_stream - wrote %d bytes\n"), + result.bytes_transferred ())); + + if (result.error () == 0 && result.bytes_transferred () != 0) + // verify sent all + ACE_ASSERT (0 == mb->total_length ()); + else + ACE_ASSERT (0); + + free_chunks_chain (mb); + + --this->io_count_; + + this->check_destroy (); +} + +// ************************************************************* +// Configuration helpers +// ************************************************************* +int +print_usage (int /* argc */, ACE_TCHAR *argv[]) +{ + ACE_ERROR + ((LM_ERROR, + ACE_TEXT ("\nusage: %s") + ACE_TEXT ("\n-f <input file>\n") + ACE_TEXT ("\n-c client only (reader-sender)") + ACE_TEXT ("\n-s server only (receiver-writer)") + ACE_TEXT ("\n-h host to connect to") + ACE_TEXT ("\n-p port") + ACE_TEXT ("\n-u show this message") + ACE_TEXT ("\n"), + argv[0] + )); + return -1; +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + if (argc == 1) // no arguments , so one button test + return 0; + + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("f:csh:p:u")); + int c; + + while ((c = get_opt ()) != EOF) + { + switch (c) + { + case 'f': + input_file = get_opt.opt_arg (); + break; + case 'c': + client_only = true; + server_only = false; + break; + case 's': + server_only = true; + client_only = false; + break; + case 'h': + host = get_opt.opt_arg (); + break; + case 'p': + port = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'u': + default: + return print_usage (argc, argv); + } // switch + } // while + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test")); + + if (::parse_args (argc, argv) == -1) + return -1; + + if (client_only) + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Running as client only\n"))); + else if (server_only) + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Running as server only\n"))); + else + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Running as server and client\n"))); + + chunk_size = ACE_OS::getpagesize (); + + Acceptor acceptor; + Connector connector; + + if (!server_only) + { + if (-1 == connector.open (1, ACE_Proactor::instance ())) + { + ACE_ASSERT (0); + return -1; + } + + // connect to first destination + if (-1 == connector.connect (ACE_INET_Addr (port, host))) + { + ACE_ASSERT (0); + return -1; + } + } + + if (!client_only) + { + // Simplify, initial read with zero size + if (-1 == acceptor.open (ACE_INET_Addr (port), 0, 1)) + { + ACE_ASSERT (0); + return -1; + } + } + + ACE_Proactor::instance ()->run_event_loop (); + + // As Proactor event loop now is inactive it is safe to destroy all + // senders + + connector.stop (); + acceptor.stop (); + + // now compare the files - available only when on same machine + + int success = 0; + if (!client_only && !server_only) + { + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Comparing the input file and the output file...\n"))); + + success = -1; + // map the two files, then perform memcmp + { + ACE_Mem_Map original_file (input_file); + ACE_Mem_Map reconstructed_file (output_file); + + if (original_file.addr () && + original_file.addr () != MAP_FAILED && + reconstructed_file.addr () && + reconstructed_file.addr () != MAP_FAILED) + { + // compare lengths + if ((original_file.size () == reconstructed_file.size ()) && + // and if same size, compare file data + (0 == ACE_OS::memcmp (original_file.addr (), + reconstructed_file.addr (), + original_file.size ()))) + success = 0; + } + } + + if (0 == success) + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("input file and the output file identical!\n"))); + else + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("input file and the output file are different!\n"))); + } + + if (!client_only) + ACE_OS::unlink (output_file); + + if (0 != success) + ACE_ASSERT (0); + + ACE_END_TEST; + + return success; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Asynch_Acceptor<Receiver>; +template class ACE_Asynch_Connector<Sender>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Asynch_Acceptor<Receiver> +#pragma instantiate ACE_Asynch_Connector<Sender> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + +#else +int +main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test")); + + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Asynchronous Scatter/Gather IO is unsupported.\n") + ACE_TEXT ("Proactor_Scatter_Gather_Test will not be run."))); + + ACE_END_TEST; + + return 0; +} + +#endif /* (ACE_HAS_WINNT4 && ACE_HAS_WINNT4 != 0) && !ACE_HAS_WINCE) */ + diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp index 1a3a2c5cea4..d8c06c1423a 100644 --- a/tests/Proactor_Test.cpp +++ b/tests/Proactor_Test.cpp @@ -1,4 +1,4 @@ -// -*- C++ -*- +// $Id$ // ============================================================================ /** diff --git a/tests/run_test.lst b/tests/run_test.lst index 99f54949131..3f8a2307353 100644 --- a/tests/run_test.lst +++ b/tests/run_test.lst @@ -78,6 +78,7 @@ Pipe_Test: !chorus !VxWorks Priority_Buffer_Test Priority_Reactor_Test: !chorus Priority_Task_Test: !Unicos +Proactor_Scatter_Gather_Test: !chorus !VxWorks !LynxOS Proactor_Test: !chorus !VxWorks !LynxOS Proactor_Timer_Test: !chorus !VxWorks !LynxOS Process_Manager_Test: !chorus !VxWorks |