summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog45
-rw-r--r--ChangeLogs/ChangeLog-02a45
-rw-r--r--ChangeLogs/ChangeLog-03a45
-rw-r--r--THANKS3
-rw-r--r--ace/Configuration_Import_Export.cpp2
-rw-r--r--ace/Strategies_T.cpp2
-rw-r--r--tests/Makefile1
-rw-r--r--tests/Makefile.bor1
-rw-r--r--tests/Proactor_Scatter_Gather_Test.cpp1445
-rw-r--r--tests/Proactor_Test.cpp2
-rw-r--r--tests/run_test.lst1
11 files changed, 1579 insertions, 13 deletions
diff --git a/ChangeLog b/ChangeLog
index d7af6d45f21..9e289a308e6 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,41 @@
+Sat May 25 16:42:46 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * tests/run_test.lst,
+ * tests/Makefile.bor,
+ * tests/Makefile: Added Proactor_Scatter_Gather_Test.
+
+Sat May 25 16:30:00 2002 Edan Ayal <edanayal@yahoo.com>
+
+ * tests/Proactor_Scatter_Gather_Test.cpp:
+ Added a new test for the asynch scatter/gather I/O
+ functionality. It is currently supported (as the feature itself)
+ only under Win32 - actually NT4 SP2 and above. The test runs in
+ a single thread, and involves a single Sender, two Receivers and
+ a single Writer. The Sender async-reads (scattered) from a file
+ into chunks of <page size>. It async-sends (gathered) the odd
+ chunks to the first receiver over a stream, and the even chunks
+ to the second receiver over a different stream. The receivers
+ async-read (scattered) from the socket streams into chunks in
+ size of <page size>, and convey the data to the Writer. The
+ Writer reconstructs the file using async-write
+ (gathered). Finally, the reconstructed file is compared to the
+ original file to determine test success. The test therefore
+ covers both async scatter/gather stream I/O and async
+ scatter/gather file I/O.
+
+Sat May 25 14:52:06 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * ace/Strategies_T.cpp (make_svc_handler): Fixed a typo where
+ dll_name_ was misspelled as shared_library_. Thanks to Nathan
+ Krasney <natan-k@actcom.co.il> for reporting this.
+
+Fri May 24 10:42:25 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * ace/Configuration_Import_Export.cpp (squish): Added a
+ check for '\n' to ensure that empty lines are handled properly.
+ Thanks to Vladimir Chovanec <Vladimir.CHOVANEC@asset.sk> for
+ reporting this.
+
Sat May 25 15:09:42 2002 Steve Huston <shuston@riverace.com>
* examples/C++NPv2/SLDex.{cpp mak}:
@@ -79,7 +117,7 @@ Fri May 24 10:18:39 2002 Steve Huston <shuston@riverace.com>
* ace/CDR_Stream.h (ACE_InputCDR): Added comments clarifying
the behavior of the ctor that takes a const ACE_Message_Block *.
- Thanks to Alexander Maack for motivating this.
+ Thanks to Alexander Maack for motivating this.
* examples/C++NPv2/display_logfile.cpp (svc): Fix memory leak
(thanks to Alexander Maack for reporting this) and don't lose
@@ -117,8 +155,9 @@ Mon May 13 19:15:49 2002 Steve Huston <shuston@riverace.com>
* netsvcs/servers/main.cpp: Move the reactor/signal initialization
to be after the service loading. If the user specifies -b (be a
- daemon) it closes all handles, including the reactor notification
- pipe. Also added ACE_TEXT decorator to the string literals.
+ daemon) it closes all handles, including the reactor
+ notification pipe. Also added ACE_TEXT decorator to the string
+ literals.
Fri May 24 13:29:05 UTC 2002 Craig Rodrigues <crodrigu@bbn.com>
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index d7af6d45f21..9e289a308e6 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,41 @@
+Sat May 25 16:42:46 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * tests/run_test.lst,
+ * tests/Makefile.bor,
+ * tests/Makefile: Added Proactor_Scatter_Gather_Test.
+
+Sat May 25 16:30:00 2002 Edan Ayal <edanayal@yahoo.com>
+
+ * tests/Proactor_Scatter_Gather_Test.cpp:
+ Added a new test for the asynch scatter/gather I/O
+ functionality. It is currently supported (as the feature itself)
+ only under Win32 - actually NT4 SP2 and above. The test runs in
+ a single thread, and involves a single Sender, two Receivers and
+ a single Writer. The Sender async-reads (scattered) from a file
+ into chunks of <page size>. It async-sends (gathered) the odd
+ chunks to the first receiver over a stream, and the even chunks
+ to the second receiver over a different stream. The receivers
+ async-read (scattered) from the socket streams into chunks in
+ size of <page size>, and convey the data to the Writer. The
+ Writer reconstructs the file using async-write
+ (gathered). Finally, the reconstructed file is compared to the
+ original file to determine test success. The test therefore
+ covers both async scatter/gather stream I/O and async
+ scatter/gather file I/O.
+
+Sat May 25 14:52:06 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * ace/Strategies_T.cpp (make_svc_handler): Fixed a typo where
+ dll_name_ was misspelled as shared_library_. Thanks to Nathan
+ Krasney <natan-k@actcom.co.il> for reporting this.
+
+Fri May 24 10:42:25 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * ace/Configuration_Import_Export.cpp (squish): Added a
+ check for '\n' to ensure that empty lines are handled properly.
+ Thanks to Vladimir Chovanec <Vladimir.CHOVANEC@asset.sk> for
+ reporting this.
+
Sat May 25 15:09:42 2002 Steve Huston <shuston@riverace.com>
* examples/C++NPv2/SLDex.{cpp mak}:
@@ -79,7 +117,7 @@ Fri May 24 10:18:39 2002 Steve Huston <shuston@riverace.com>
* ace/CDR_Stream.h (ACE_InputCDR): Added comments clarifying
the behavior of the ctor that takes a const ACE_Message_Block *.
- Thanks to Alexander Maack for motivating this.
+ Thanks to Alexander Maack for motivating this.
* examples/C++NPv2/display_logfile.cpp (svc): Fix memory leak
(thanks to Alexander Maack for reporting this) and don't lose
@@ -117,8 +155,9 @@ Mon May 13 19:15:49 2002 Steve Huston <shuston@riverace.com>
* netsvcs/servers/main.cpp: Move the reactor/signal initialization
to be after the service loading. If the user specifies -b (be a
- daemon) it closes all handles, including the reactor notification
- pipe. Also added ACE_TEXT decorator to the string literals.
+ daemon) it closes all handles, including the reactor
+ notification pipe. Also added ACE_TEXT decorator to the string
+ literals.
Fri May 24 13:29:05 UTC 2002 Craig Rodrigues <crodrigu@bbn.com>
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index d7af6d45f21..9e289a308e6 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,41 @@
+Sat May 25 16:42:46 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * tests/run_test.lst,
+ * tests/Makefile.bor,
+ * tests/Makefile: Added Proactor_Scatter_Gather_Test.
+
+Sat May 25 16:30:00 2002 Edan Ayal <edanayal@yahoo.com>
+
+ * tests/Proactor_Scatter_Gather_Test.cpp:
+ Added a new test for the asynch scatter/gather I/O
+ functionality. It is currently supported (as the feature itself)
+ only under Win32 - actually NT4 SP2 and above. The test runs in
+ a single thread, and involves a single Sender, two Receivers and
+ a single Writer. The Sender async-reads (scattered) from a file
+ into chunks of <page size>. It async-sends (gathered) the odd
+ chunks to the first receiver over a stream, and the even chunks
+ to the second receiver over a different stream. The receivers
+ async-read (scattered) from the socket streams into chunks in
+ size of <page size>, and convey the data to the Writer. The
+ Writer reconstructs the file using async-write
+ (gathered). Finally, the reconstructed file is compared to the
+ original file to determine test success. The test therefore
+ covers both async scatter/gather stream I/O and async
+ scatter/gather file I/O.
+
+Sat May 25 14:52:06 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * ace/Strategies_T.cpp (make_svc_handler): Fixed a typo where
+ dll_name_ was misspelled as shared_library_. Thanks to Nathan
+ Krasney <natan-k@actcom.co.il> for reporting this.
+
+Fri May 24 10:42:25 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
+
+ * ace/Configuration_Import_Export.cpp (squish): Added a
+ check for '\n' to ensure that empty lines are handled properly.
+ Thanks to Vladimir Chovanec <Vladimir.CHOVANEC@asset.sk> for
+ reporting this.
+
Sat May 25 15:09:42 2002 Steve Huston <shuston@riverace.com>
* examples/C++NPv2/SLDex.{cpp mak}:
@@ -79,7 +117,7 @@ Fri May 24 10:18:39 2002 Steve Huston <shuston@riverace.com>
* ace/CDR_Stream.h (ACE_InputCDR): Added comments clarifying
the behavior of the ctor that takes a const ACE_Message_Block *.
- Thanks to Alexander Maack for motivating this.
+ Thanks to Alexander Maack for motivating this.
* examples/C++NPv2/display_logfile.cpp (svc): Fix memory leak
(thanks to Alexander Maack for reporting this) and don't lose
@@ -117,8 +155,9 @@ Mon May 13 19:15:49 2002 Steve Huston <shuston@riverace.com>
* netsvcs/servers/main.cpp: Move the reactor/signal initialization
to be after the service loading. If the user specifies -b (be a
- daemon) it closes all handles, including the reactor notification
- pipe. Also added ACE_TEXT decorator to the string literals.
+ daemon) it closes all handles, including the reactor
+ notification pipe. Also added ACE_TEXT decorator to the string
+ literals.
Fri May 24 13:29:05 UTC 2002 Craig Rodrigues <crodrigu@bbn.com>
diff --git a/THANKS b/THANKS
index 0916622d6d4..71ea2638331 100644
--- a/THANKS
+++ b/THANKS
@@ -1334,7 +1334,7 @@ Olof Lindfors <olof.lindfors@protegrity.com>
Tom Wagner <TomW@CoManage.net>
Kyle Brost <Kyle.Brost@quest.com>
Nicolas Vincent <Vincent.Nicolas@Radiometer.dk>
-Jonathan Wackley <jwackley@legato.com>
+Jonathan Wackley <jwackley@mountaincable.com>
Jan Kalin <jan.kalin@zag.si>
Andreas Huggel <huggel_andreas@isoftel.com>
Alain Totouom <atotouom@gmx.de>
@@ -1520,6 +1520,7 @@ Bill Tonseth <wtonseth@yahoo.com>
Frank Pilhofer <fp@fpx.de>
Eric Quere <Eric.Quere@med.ge.com>
Keith Thornton <keith.thornton.kt@germany.agfa.com>
+Nathan Krasney <natan-k@actcom.co.il>
I would particularly like to thank Paul Stephenson, who worked with me
at Ericsson in the early 1990's. Paul devised the recursive Makefile
diff --git a/ace/Configuration_Import_Export.cpp b/ace/Configuration_Import_Export.cpp
index 8e97896e2ea..8c24242ae01 100644
--- a/ace/Configuration_Import_Export.cpp
+++ b/ace/Configuration_Import_Export.cpp
@@ -598,7 +598,7 @@ ACE_Ini_ImpExp::squish (ACE_TCHAR *src)
// Now start at the beginning and move over all whitespace.
for (cp = src;
- (*cp != '\0') && ((*cp == ' ') || (*cp == '\t'));
+ (*cp != '\0') && ((*cp == ' ') || (*cp == '\t') || (*cp == '\n'));
cp++)
continue;
diff --git a/ace/Strategies_T.cpp b/ace/Strategies_T.cpp
index 949e7a9e524..f8c3052e2f2 100644
--- a/ace/Strategies_T.cpp
+++ b/ace/Strategies_T.cpp
@@ -72,7 +72,7 @@ ACE_DLL_Strategy<SVC_HANDLER>::make_svc_handler (SVC_HANDLER *&sh)
ACE_TRACE ("ACE_DLL_Strategy<SVC_HANDLER>::make_svc_handler");
// Open the shared library.
- ACE_SHLIB_HANDLE handle = ACE_OS::dlopen (this->shared_library_);
+ ACE_SHLIB_HANDLE handle = ACE_OS::dlopen (this->dll_name_);
// Extract the factory function.
SVC_HANDLER *(*factory)(void) =
diff --git a/tests/Makefile b/tests/Makefile
index c5f188198ed..963e5d58979 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -66,6 +66,7 @@ BIN = Aio_Platform_Test \
Object_Manager_Test \
OrdMultiSet_Test \
OS_Test \
+ Proactor_Scatter_Gather_Test \
Proactor_Test \
Proactor_Timer_Test \
Process_Mutex_Test \
diff --git a/tests/Makefile.bor b/tests/Makefile.bor
index dcf8377a9ab..973aea332af 100644
--- a/tests/Makefile.bor
+++ b/tests/Makefile.bor
@@ -66,6 +66,7 @@ NAMES = \
OrdMultiSet_Test \
OS_Test \
Pipe_Test \
+ Proactor_Scatter_Gather_Test \
Proactor_Test \
Proactor_Timer_Test \
Priority_Buffer_Test \
diff --git a/tests/Proactor_Scatter_Gather_Test.cpp b/tests/Proactor_Scatter_Gather_Test.cpp
new file mode 100644
index 00000000000..556ffe79f7c
--- /dev/null
+++ b/tests/Proactor_Scatter_Gather_Test.cpp
@@ -0,0 +1,1445 @@
+// $Id$
+
+// ============================================================================
+/**
+ * @file Proactor_Scatter_Gather_Test.cpp
+ *
+ * The test runs on a single thread, and involves a single Sender,
+ * two Receivers and a single Writer. The Sender async-reads
+ * (scattered) from a file into chunks of <page size>. It
+ * async-sends (gathered) the odd chunks to the first receiver over a
+ * stream, and the even chunks to the second receiver over a
+ * different stream. The receivers async-read (scattered) from the
+ * socket streams into chunks in size of <page size>, and convey the
+ * data to the Writer. The Writer reconstructs the file using
+ * async-write (gathered). Then, the reconstructed file is compared
+ * to the original file to determine test success. So, It covers both
+ * async scatter/gather stream I/O and async scatter/gather file I/O.
+ * The wire transfer protocol is very naive (and totally non
+ * reliable...) - when both connections are closed, EOF is assumed.
+ * The test can be also run in a seperated sender and receiver mode,
+ * to test real network influences.
+ *
+ * This test is based upon some building blocks taken from the
+ * Proactor_Test.cpp.
+ *
+ * @author Edan Ayal <edanayal@yahoo.com> */
+// ============================================================================
+
+#include "test_config.h"
+
+#if ((defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && !defined (ACE_HAS_WINCE))
+ // This currently only works on Win32 platforms (NT SP2 and above).
+ // Support for Unix platforms supporting POSIX aio calls should be added in future.
+
+#include "ace/Get_Opt.h"
+
+#include "ace/Proactor.h"
+#include "ace/Asynch_Acceptor.h"
+#include "ace/Asynch_Connector.h"
+
+#include "ace/SOCK_Connector.h"
+
+// For the Acceptor/Connector handlers maintenance lists
+static const int SENDERS = 1;
+static const int RECEIVERS = 2;
+
+// Port that we're receiving connections on.
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+
+static ACE_TCHAR *host = ACE_LOCALHOST;
+
+// File that we're sending.
+static ACE_TCHAR *input_file = ACE_TEXT("Proactor_Scatter_Gather_Test.cpp");
+
+// Name of the output file.
+static ACE_TCHAR *output_file = ACE_TEXT("output");
+
+static bool client_only = false;
+static bool server_only = false;
+static size_t chunk_size = 0;
+
+enum
+{
+ ODD = 0,
+ EVEN
+};
+
+// *************************************************************
+// Some chunks chain helper routines
+// *************************************************************
+static int allocate_chunks_chain (ACE_Message_Block *&head_mb,
+ size_t number_of_chunks)
+{
+ ACE_Message_Block *pre_mb = 0;
+
+ for (size_t index = 0; index < number_of_chunks; ++index)
+ {
+#if defined (ACE_WIN32)
+ void *addr = ::VirtualAlloc (0,
+ chunk_size,
+ MEM_COMMIT,
+ PAGE_READWRITE);
+#else
+ void *addr = new char[chunk_size];
+#endif /* ACE_WIN32 */
+ if (addr)
+ {
+ ACE_Message_Block *mb = new ACE_Message_Block (ACE_static_cast (char *, addr),
+ chunk_size);
+ if (!head_mb)
+ head_mb = mb;
+
+ // chain them together
+ if (pre_mb)
+ pre_mb->cont (mb);
+ pre_mb = mb;
+ }
+ else
+ {
+ ACE_ASSERT (0);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static void
+free_chunks_chain (ACE_Message_Block *&mb)
+{
+ for (const ACE_Message_Block* msg = mb;
+ msg != 0;
+ msg = msg->cont ())
+ {
+#if defined (ACE_WIN32)
+ ::VirtualFree (msg->base (),
+ msg->size (),
+ MEM_DECOMMIT);
+#else
+ delete [] msg->base ();
+#endif /* ACE_WIN32 */
+ }
+
+ mb->release ();
+ mb = 0;
+}
+
+static int
+last_chunk (ACE_Message_Block *chain,
+ ACE_Message_Block *&last)
+{
+ if (!chain)
+ return 0;
+
+ size_t index = 1;
+ last = chain;
+ while (0 != last->cont ())
+ {
+ last = last->cont ();
+ ++index;
+ }
+
+ return index;
+}
+
+static void
+merge_odd_even_chains (ACE_Message_Block *odd_mb,
+ ACE_Message_Block *even_mb)
+{
+ ACE_Message_Block *pre_pre_mb = odd_mb;
+ ACE_Message_Block *pre_mb = even_mb;
+ ACE_Message_Block *curr_mb = odd_mb->cont ();
+
+ if (even_mb)
+ {
+ for (; curr_mb != 0; curr_mb = pre_pre_mb->cont ())
+ {
+ pre_pre_mb->cont (pre_mb);
+
+ // increment history pointers
+ pre_pre_mb = pre_mb;
+ pre_mb = curr_mb;
+ }
+
+ pre_pre_mb->cont (pre_mb);
+ pre_mb->cont (0);
+ }
+}
+
+static void
+split_odd_even_chains (ACE_Message_Block *odd_mb,
+ ACE_Message_Block *even_mb)
+{
+ ACE_Message_Block *pre_pre_mb = odd_mb;
+ ACE_Message_Block *pre_mb = even_mb;
+ ACE_Message_Block *curr_mb = (even_mb ? even_mb->cont () : 0);
+
+ for (; curr_mb != 0; curr_mb = curr_mb->cont ())
+ {
+ pre_pre_mb->cont (curr_mb);
+
+ // increment history pointers
+ pre_pre_mb = pre_mb;
+ pre_mb = curr_mb;
+ }
+
+ pre_pre_mb->cont (0);
+ if (pre_mb)
+ pre_mb->cont (0);
+}
+
+static void
+add_to_chunks_chain (ACE_Message_Block *&chunks_chain,
+ ACE_Message_Block *additional_chunks_chain)
+{
+ if (0 == chunks_chain)
+ chunks_chain = additional_chunks_chain;
+ else
+ {
+ ACE_Message_Block *last = 0;
+ last_chunk (chunks_chain, last);
+ if (last)
+ last->cont (additional_chunks_chain);
+ }
+}
+
+static void
+remove_empty_chunks (ACE_Message_Block *&chunks_chain)
+{
+ if (0 == chunks_chain)
+ return;
+
+ ACE_Message_Block *first_empty = chunks_chain;
+ ACE_Message_Block *pre_mb = 0;
+
+ while (first_empty->length () > 0 &&
+ 0 != first_empty->cont ())
+ {
+ pre_mb = first_empty;
+ first_empty = first_empty->cont ();
+ }
+
+ // break the chain there, and release the empty end (might be everything)
+ if (0 == first_empty->length ())
+ {
+ if (pre_mb) // might be 0, in case it's the entire chain
+ pre_mb->cont (0);
+
+ if (first_empty == chunks_chain)
+ chunks_chain = 0;
+
+ free_chunks_chain (first_empty);
+ }
+}
+
+// *************************************************************
+// Acceptor, Receiver and Writer
+// *************************************************************
+class Receiver;
+
+class Acceptor : public ACE_Asynch_Acceptor<Receiver>
+{
+ friend class Receiver;
+
+public:
+ Acceptor (void);
+ virtual ~Acceptor (void);
+
+ void stop (void);
+
+ // Virtual from ACE_Asynch_Acceptor
+ virtual Receiver *make_handler (void);
+
+ int get_number_sessions (void) { return this->sessions_; }
+
+private:
+ void on_new_receiver (Receiver &rcvr);
+ void on_delete_receiver (Receiver &rcvr);
+
+ int sessions_;
+ Receiver *list_receivers_[RECEIVERS];
+};
+
+class Writer;
+
+// The first instantiated take the role of the odd receiver
+class Receiver : public ACE_Service_Handler
+{
+ friend class Acceptor;
+ friend class Writer;
+
+public:
+ Receiver (Acceptor *acceptor = 0, int index = -1);
+ virtual ~Receiver (void);
+
+ /// This is called after the new connection has been accepted.
+ virtual void open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block);
+protected:
+ /// This is called by the framework when asynchronous <read> operation from the
+ /// socket completes.
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+
+private:
+ int initiate_read_stream (void);
+
+ void check_destroy (void);
+
+ Acceptor *acceptor_;
+ int index_;
+
+ // Socket input
+ ACE_Asynch_Read_Stream rs_;
+ ACE_HANDLE socket_handle_;
+
+ // Writer
+ static Writer* writer_;
+
+ long io_count_;
+
+ char odd_;
+
+ // if we get non-page-size reminder, we will not send it to the writer
+ // until it is full (unless at end)
+ ACE_Message_Block *partial_chunk_;
+};
+
+class Writer : public ACE_Service_Handler
+{
+ friend class Receiver;
+
+public:
+ Writer (void);
+ virtual ~Writer (void);
+
+ void open (void);
+
+ // this is *not* a callback from the framework
+ int handle_read_chunks_chain (ACE_Message_Block *mb,
+ int type);
+
+ // for determining when last receiver dies
+ void on_new_receiver ();
+ void on_delete_receiver ();
+
+protected:
+ /// This is called by the framework when an asynchronous <write> to the file
+ /// completes.
+ virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
+
+private:
+ int initiate_write_file (void);
+
+private:
+ // Output file
+ ACE_Asynch_Write_File wf_;
+ ACE_HANDLE output_file_handle_;
+ size_t writing_file_offset_;
+ size_t reported_file_offset_;
+ ACE_Message_Block *odd_chain_;
+ ACE_Message_Block *even_chain_;
+ long io_count_;
+ char receiver_count_;
+};
+
+// *************************************************************
+// Receiver Impl
+// *************************************************************
+
+Writer *Receiver::writer_ = 0;
+
+Receiver::Receiver (Acceptor * acceptor, int index)
+ : acceptor_ (acceptor),
+ index_ (index),
+ socket_handle_ (ACE_INVALID_HANDLE),
+ io_count_ (0),
+ partial_chunk_ (0)
+{
+ // the first one is the odd one
+ this->odd_ = ((0 == index) ? 1 : 0);
+
+ if (this->odd_)
+ {
+ Receiver::writer_ = new Writer;
+ if (!Receiver::writer_)
+ {
+ ACE_ASSERT (0);
+ return;
+ }
+ }
+
+ Receiver::writer_->on_new_receiver ();
+
+ if (this->acceptor_ != 0)
+ this->acceptor_->on_new_receiver (*this);
+}
+
+Receiver::~Receiver (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Receiver::~Receiver\n")));
+
+ if (this->acceptor_ != 0)
+ this->acceptor_->on_delete_receiver (*this);
+
+ if (this->socket_handle_ != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (this->socket_handle_);
+
+ Receiver::writer_->on_delete_receiver ();
+
+ if (this->partial_chunk_)
+ {
+ ACE_ASSERT (0); // should not be getting here
+ this->partial_chunk_->release ();
+ }
+}
+
+void
+Receiver::check_destroy (void)
+{
+ if (this->io_count_ <= 0)
+ delete this;
+}
+
+void
+Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
+{
+ this->socket_handle_ = handle;
+
+ // Open the ACE_Asynch_Read_Stream
+ if (this->rs_.open (*this, this->socket_handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
+ else
+ {
+ if (this->odd_)
+ Receiver::writer_->open ();
+
+ this->initiate_read_stream ();
+ }
+
+ this->check_destroy ();
+}
+
+int
+Receiver::initiate_read_stream (void)
+{
+ if (!Receiver::writer_)
+ return -1;
+
+ // how many chunks to allocate?
+ size_t number_of_new_chunks = (this->partial_chunk_ ?
+ (ACE_IOV_MAX / RECEIVERS) - 1
+ : ACE_IOV_MAX / RECEIVERS);
+
+ // allocate chunks chain
+ ACE_Message_Block *head_mb = 0;
+ if (-1 == allocate_chunks_chain (head_mb, number_of_new_chunks))
+ {
+ ACE_ASSERT (0);
+ return -1;
+ }
+
+ // calculate how many bytes to read
+
+ // head_mb could be 0 (no new chunks allocated)
+ size_t bytes_to_read = head_mb ? head_mb->total_size () : 0;
+
+ // add the partial chunk at the front if appropriate, and update
+ // the number of bytes to read
+ if (this->partial_chunk_)
+ {
+ bytes_to_read += this->partial_chunk_->space ();
+ this->partial_chunk_->cont (head_mb);
+ head_mb = this->partial_chunk_;
+ this->partial_chunk_ = 0;
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Receiver::initiate_read_stream - (%s) readv %d\n"),
+ this->odd_ ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
+ bytes_to_read));
+
+ // perform the actual scattered read
+ if (this->rs_.readv (*head_mb,
+ bytes_to_read) == -1)
+ {
+ free_chunks_chain (head_mb);
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")),
+ -1);
+ }
+
+ ++this->io_count_;
+ return 0;
+}
+
+void
+Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ ACE_Message_Block *mb = &result.message_block ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Receiver::handle_read_stream - (%s) read %d\n"),
+ this->odd_ ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
+ result.bytes_transferred ()));
+
+ // Transfer only complete chunks to the writer.
+ // Save last partial chunk for the next call.
+ // On disconnect (error or 0 transferred), transfer whatever we have.
+
+ // at this stage there should not be anything there
+ ACE_ASSERT (!this->partial_chunk_);
+
+ // first, remove the empty chunks
+ remove_empty_chunks (mb);
+
+ if (mb && Receiver::writer_)
+ { // there's something to write, and who to write to
+
+ // write everything or only complete chunks?
+
+ // write everything - when no new bytes were transferred
+ bool write_everything = !result.bytes_transferred ();
+ if (write_everything)
+ Receiver::writer_->handle_read_chunks_chain (mb,
+ this->odd_ ? ODD : EVEN);
+ else
+ { // filter out the partial chunk at the end (if present)
+ // and save it for later before writing the full chunks
+
+ // have this->partial_chunk_ point to the last chunk in the chain
+ size_t last_index = last_chunk (mb, this->partial_chunk_);
+ if (this->partial_chunk_ &&
+ this->partial_chunk_->length () < chunk_size)
+ { // found partial chunk at end of chain
+ // detach it from the chain
+ if (last_index > 1) // chain bigger than 1
+ {
+ ACE_Message_Block *pre_last = mb;
+ for (size_t index = 1; index < last_index - 1; ++index)
+ pre_last = pre_last->cont ();
+
+ // detach partial chunk from chain
+ pre_last->cont (0);
+ }
+ else
+ // chain in length of 1 - so we need to zero mb
+ mb = 0;
+ }
+ else // last is a full chunk, so hand it over with the rest
+ this->partial_chunk_ = 0;
+
+ // transfer (if there's anything left)
+ if (mb && mb->total_length ())
+ Receiver::writer_->handle_read_chunks_chain (
+ mb,
+ this->odd_ ? ODD : EVEN);
+
+ // initiate more reads only if no error
+ if (!result.error ())
+ this->initiate_read_stream ();
+ else
+ ACE_ASSERT (0);
+ }
+ }
+ else if (mb && !Receiver::writer_)
+ // no one to write to
+ free_chunks_chain (mb);
+
+ --this->io_count_;
+
+ this->check_destroy ();
+}
+
+// *************************************************************
+// Acceptor Impl
+// *************************************************************
+
+Acceptor::Acceptor (void)
+ : sessions_ (0)
+{
+ for (int i = 0; i < RECEIVERS; ++i)
+ this->list_receivers_[i] = 0;
+}
+
+Acceptor::~Acceptor (void)
+{
+ this->stop ();
+}
+
+
+void
+Acceptor::stop (void)
+{
+ // This method can be called only after proactor event loop is done
+ // in all threads.
+ for (int i = 0; i < RECEIVERS; ++i)
+ {
+ delete this->list_receivers_[i];
+ this->list_receivers_[i] = 0;
+ }
+}
+
+void
+Acceptor::on_new_receiver (Receiver & rcvr)
+{
+ ++this->sessions_;
+ this->list_receivers_[rcvr.index_] = &rcvr;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"),
+ this->sessions_));
+}
+
+void
+Acceptor::on_delete_receiver (Receiver & rcvr)
+{
+ --this->sessions_;
+ if (rcvr.index_ >= 0
+ && rcvr.index_ < RECEIVERS
+ && this->list_receivers_[rcvr.index_] == &rcvr)
+ this->list_receivers_[rcvr.index_] = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"),
+ this->sessions_));
+}
+
+Receiver *
+Acceptor::make_handler (void)
+{
+ if (this->sessions_ >= RECEIVERS)
+ return 0;
+
+ for (int i = 0; i < RECEIVERS; ++i)
+ {
+ if (this->list_receivers_[i] == 0)
+ {
+ ACE_NEW_RETURN (this->list_receivers_[i],
+ Receiver (this, i),
+ 0);
+ return this->list_receivers_[i];
+ }
+ }
+
+ return 0;
+}
+
+// *************************************************************
+// Writer Impl
+// *************************************************************
+
+Writer::Writer (void)
+: output_file_handle_ (ACE_INVALID_HANDLE),
+ writing_file_offset_ (0),
+ reported_file_offset_ (0),
+ odd_chain_ (0),
+ even_chain_ (0),
+ io_count_ (0),
+ receiver_count_ (0)
+{
+}
+
+Writer::~Writer (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Writer::~Writer\n")));
+
+ if (this->output_file_handle_ != ACE_INVALID_HANDLE)
+ ACE_OS::close (this->output_file_handle_);
+
+ Receiver::writer_ = 0;
+}
+
+void
+Writer::on_new_receiver ()
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Writer::on_new_receiver\n")));
+
+ ++this->receiver_count_;
+}
+
+void
+Writer::on_delete_receiver ()
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Writer::on_delete_receiver\n")));
+
+ --this->receiver_count_;
+
+ if (0 == this->receiver_count_)
+ {
+ if (this->io_count_ <= 0)
+ // no pending io, so do the work oursleves
+ // (if pending io, they'll see the zero receiver count)
+ this->initiate_write_file ();
+ }
+}
+
+void
+Writer::open (void)
+{
+ // Open the file for output
+ if (ACE_INVALID_HANDLE == (this->output_file_handle_ = ACE_OS::open (output_file,
+ O_CREAT | _O_TRUNC | _O_WRONLY |\
+ FILE_FLAG_OVERLAPPED |\
+ FILE_FLAG_NO_BUFFERING,
+ ACE_DEFAULT_FILE_PERMS)))
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Writer::open::ACE_OS::open")));
+ // Open the ACE_Asynch_Write_File
+ else if (this->wf_.open (*this, this->output_file_handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Writer::open::ACE_Asynch_Write_File::open")));
+}
+
+int
+Writer::handle_read_chunks_chain (ACE_Message_Block *mb,
+ int type)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Writer::handle_read_chunks_chain - (%s) %d bytes\n"),
+ (type == ODD) ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
+ mb->total_length ()));
+
+ add_to_chunks_chain (ODD == type ? this->odd_chain_ : this->even_chain_, mb);
+
+ this->initiate_write_file ();
+
+ return 0;
+}
+
+int
+Writer::initiate_write_file (void)
+{
+ // find out how much can we merge
+ ACE_Message_Block *dummy_last = 0;
+ size_t odd_count = last_chunk (this->odd_chain_, dummy_last);
+ size_t even_count = last_chunk (this->even_chain_, dummy_last);
+
+ size_t merge_size = ACE_MIN (ACE_MIN (odd_count, even_count),
+ (size_t) ACE_IOV_MAX);
+
+ // the options here are as follows:
+ // io_count_ can be zero or greater.
+ // merge_size can be zero or not.
+ // if non zero merge, write the merge. ASSERT receiver_count_ is non zero too.
+ // if zero merge:
+ // if receiver_count_ is non zero, NOOP.
+ // if zero receiver_count_, we should write whatever is left,
+ // and terminate the writer at completion.
+ // if nothing to write, and io_count_ is zero too, terminate here.
+
+ if (0 == merge_size &&
+ 0 != this->receiver_count_)
+ return 0;
+
+ if (0 == merge_size &&
+ 0 == this->receiver_count_ &&
+ 0 == odd_count &&
+ 0 == even_count &&
+ 0 == this->io_count_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Writer::initiate_write_file")
+ ACE_TEXT (" - ending proactor event loop\n")));
+
+ ACE_Proactor::instance ()->end_event_loop ();
+
+ delete this;
+
+ return 0;
+ }
+
+ // if we reached nere and merge_size is zero, we should write whatever is
+ // in the queues (1 to 2 chunks together), so let's force the merge size to 1.
+ if (0 == merge_size)
+ {
+ ACE_ASSERT (1 == odd_count && 1 >= even_count);
+ merge_size = 1;
+ }
+
+ // Now that we found out what we want to do, prepare the chain
+ // that will be written, and update the remainders
+ ACE_Message_Block *new_odd_chain_head = this->odd_chain_;
+ ACE_Message_Block *new_even_chain_head = this->even_chain_;
+
+ // locate the place for detachment in the chains
+ ACE_Message_Block *pre_odd = 0;
+ ACE_Message_Block *pre_even = 0;
+ for (size_t index = 0; index < merge_size; ++index)
+ {
+ pre_odd = new_odd_chain_head;
+ if (new_odd_chain_head)
+ new_odd_chain_head = new_odd_chain_head->cont ();
+ pre_even = new_even_chain_head;
+ if (new_even_chain_head)
+ new_even_chain_head = new_even_chain_head->cont ();
+ }
+ // now detach the chain
+ if (pre_odd)
+ pre_odd->cont (0);
+ if (pre_even)
+ pre_even->cont (0);
+
+ // perform merge between the two chains
+ merge_odd_even_chains (this->odd_chain_, this->even_chain_);
+
+ // and now finally perform the write
+ ACE_Message_Block *united_mb = this->odd_chain_;
+ size_t increment_writing_file_offset = united_mb->total_length ();
+
+ // Reconstruct the file
+ // Write the size, not the length, because we must write in chunks of <page size>
+ if (this->wf_.writev (*united_mb,
+ united_mb->total_size (),
+ this->writing_file_offset_) == -1)
+ {
+ free_chunks_chain (united_mb);
+
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Writer::initiate_write_file::ACE_Asynch_Write_Stream::writev")),
+ -1);
+ }
+
+ // we update now because otherwise, we'd have error when performing
+ // pipelined writing (that is, mulitple calls to write before the callbacks
+ // to handle_x)
+ this->writing_file_offset_ += increment_writing_file_offset;
+
+ // update the remainders of the chains
+ this->odd_chain_ = new_odd_chain_head;
+ this->even_chain_ = new_even_chain_head;
+
+ ++this->io_count_;
+ return 0;
+}
+
+void
+Writer::handle_write_file (const ACE_Asynch_Write_File::Result &result)
+{
+ ACE_Message_Block *mb = &result.message_block ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Writer::handle_write_file - offset %d + %d\n"),
+ this->reported_file_offset_,
+ result.bytes_transferred ()));
+
+ this->reported_file_offset_ += result.bytes_transferred ();
+
+ // Always truncate as required,
+ // because partial will always be the last write to a file
+ ACE_Message_Block *last_mb = mb;
+ last_chunk (mb, last_mb);
+
+ if (last_mb->space ())
+ ACE_OS::truncate (output_file, this->reported_file_offset_ - last_mb->space ());
+
+ free_chunks_chain (mb);
+
+ --this->io_count_;
+
+ // end of process?
+ if (0 == this->receiver_count_ &&
+ 0 == this->io_count_)
+ {
+ ACE_ASSERT (0 == this->odd_chain_ && 0 == this->even_chain_);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Writer::handle_write_file"),
+ ACE_TEXT (" - ending proactor event loop\n")));
+
+ ACE_Proactor::instance ()->end_event_loop ();
+
+ delete this;
+ }
+}
+
+// *************************************************************
+// Connector and Sender
+// *************************************************************
+class Sender;
+
+class Connector : public ACE_Asynch_Connector<Sender>
+{
+ friend class Sender;
+
+public:
+ Connector (void);
+ virtual ~Connector (void);
+
+ void stop (void);
+
+ // Virtual from ACE_Asynch_Connector
+ virtual Sender *make_handler (void);
+
+private:
+ void on_new_sender (Sender &rcvr);
+ void on_delete_sender (Sender &rcvr);
+
+ int sessions_;
+ Sender *list_senders_[SENDERS];
+};
+
+class Sender : public ACE_Service_Handler
+{
+ friend class Connector;
+public:
+
+ Sender (Connector *connector = 0, int index = -1);
+
+ virtual ~Sender (void);
+
+ /// This is called after the new connection has been established.
+ virtual void open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block);
+
+ // This is called by the framework when asynchronous reads from the file complete
+ virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
+
+ // This is called by the framework when asynchronous writes from the socket complete
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+
+private:
+ void check_destroy (void);
+
+ int initiate_read_file (void);
+
+ int initiate_write_stream (ACE_Message_Block &mb);
+
+ int index_;
+ Connector * connector_;
+
+ // File to read from
+ ACE_Asynch_Read_File rf_;
+ ACE_HANDLE input_file_handle_;
+ size_t file_offset_;
+
+ // Sockets to send to
+ // odd and even socket output streams
+ ACE_Asynch_Write_Stream ws_[RECEIVERS];
+ ACE_HANDLE socket_handle_[RECEIVERS];
+
+ long io_count_;
+};
+
+// *************************************************************
+// Connector Impl
+// *************************************************************
+
+Connector::Connector (void)
+ : sessions_ (0)
+{
+ for (int i = 0; i < SENDERS; ++i)
+ this->list_senders_[i] = 0;
+}
+
+Connector::~Connector (void)
+{
+ this->stop ();
+}
+
+void
+Connector::stop (void)
+{
+ // This method can be called only after proactor event loop is done
+ // in all threads.
+
+ for (int i = 0; i < SENDERS; ++i)
+ {
+ delete this->list_senders_[i];
+ this->list_senders_[i] = 0;
+ }
+}
+
+void
+Connector::on_new_sender (Sender &sndr)
+{
+ ++this->sessions_;
+ this->list_senders_[sndr.index_] = &sndr;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sender::CTOR sessions_ = %d\n"),
+ this->sessions_));
+}
+
+void
+Connector::on_delete_sender (Sender &sndr)
+{
+ --this->sessions_;
+ if (sndr.index_ >= 0
+ && sndr.index_ < SENDERS
+ && this->list_senders_[sndr.index_] == &sndr)
+ this->list_senders_[sndr.index_] = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sender::~DTOR sessions_ = %d\n"),
+ this->sessions_));
+}
+
+Sender *
+Connector::make_handler (void)
+{
+ if (this->sessions_ >= SENDERS)
+ return 0;
+
+ for (int i = 0; i < SENDERS; ++i)
+ {
+ if (this->list_senders_ [i] == 0)
+ {
+ ACE_NEW_RETURN (this->list_senders_[i],
+ Sender (this, i),
+ 0);
+ return this->list_senders_[i];
+ }
+ }
+
+ return 0;
+}
+
+// *************************************************************
+// Sender Impl
+// *************************************************************
+
+Sender::Sender (Connector * connector, int index)
+ : index_ (index),
+ connector_ (connector),
+ input_file_handle_ (ACE_INVALID_HANDLE),
+ file_offset_ (0),
+ io_count_ (0)
+{
+ socket_handle_[ODD] = socket_handle_[EVEN] = ACE_INVALID_HANDLE;
+
+ if (this->connector_ != 0)
+ this->connector_->on_new_sender (*this);
+}
+
+Sender::~Sender (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sender::~Sender\n")));
+
+ if (this->connector_ != 0)
+ this->connector_->on_delete_sender (*this);
+
+ if (this->socket_handle_[ODD] != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (this->socket_handle_[ODD]);
+
+ if (this->socket_handle_[EVEN] != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (this->socket_handle_[EVEN]);
+
+ if (this->input_file_handle_ != ACE_INVALID_HANDLE)
+ ACE_OS::close (this->input_file_handle_);
+
+ if (client_only)
+ ACE_Proactor::instance ()->end_event_loop ();
+}
+
+// return true if we alive, false we commited suicide
+void
+Sender::check_destroy (void)
+{
+ if (this->io_count_ <= 0)
+ delete this;
+}
+
+void
+Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
+{
+ this->socket_handle_[ODD] = handle;
+
+ // Open the input file
+ if (ACE_INVALID_HANDLE == (this->input_file_handle_ = ACE_OS::open (input_file,
+ _O_RDONLY |\
+ FILE_FLAG_OVERLAPPED |\
+ FILE_FLAG_NO_BUFFERING,
+ ACE_DEFAULT_FILE_PERMS)))
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::open::ACE_OS::open")));
+ }
+ else
+ {
+ // now connect (w/o the connector factory) to the even (=second) receiver:
+ // we don't connect thru the factory in order not to instantiate another Sender
+ ACE_SOCK_Connector sock_connector;
+ ACE_SOCK_Stream sock_stream;
+ if (-1 == sock_connector.connect (sock_stream,
+ ACE_INET_Addr (port, host)))
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::open::ACE_SOCK_Connector::connect")));
+
+ else
+ {
+ this->socket_handle_[EVEN] = sock_stream.get_handle ();
+
+ // Open odd ACE_Asynch_Write_Stream
+ if (this->ws_[ODD].open (*this, this->socket_handle_[ODD]) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
+
+ // Open even ACE_Asynch_Write_Stream
+ else if (this->ws_[EVEN].open (*this, this->socket_handle_[EVEN]) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
+
+ // Open ACE_Asynch_Read_File
+ else if (this->rf_.open (*this, this->input_file_handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::open::ACE_Asynch_Read_File::open")));
+ else
+ // Start an asynchronous read
+ this->initiate_read_file ();
+ }
+ }
+
+ this->check_destroy ();
+}
+
+int
+Sender::initiate_read_file (void)
+{
+ ACE_ASSERT (0 == this->file_offset_ % chunk_size);
+
+ static const size_t file_size = ACE_OS::filesize (input_file);
+
+ static const size_t number_of_chunks_needed_for_file
+ = ACE_OS::ceil ((double) file_size / chunk_size);
+
+ size_t relevant_number_of_chunks =
+ ACE_MIN ((size_t)ACE_IOV_MAX,
+ number_of_chunks_needed_for_file - (this->file_offset_ / chunk_size));
+
+ if (!relevant_number_of_chunks)
+ {
+ ACE_ASSERT (0); // Just 2 C it coming
+ return 0;
+ }
+
+ ACE_Message_Block *head_mb = 0;
+ if (-1 == allocate_chunks_chain (head_mb, relevant_number_of_chunks))
+ {
+ ACE_ASSERT (0);
+ return -1;
+ }
+
+ // Inititiate read
+ if (this->rf_.readv (*head_mb,
+ head_mb->total_size (),
+ this->file_offset_) == -1)
+ {
+ free_chunks_chain (head_mb);
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::initiate_read_file::")
+ ACE_TEXT ("ACE_Asynch_Read_Stream::readv")),
+ -1);
+ }
+
+ ++this->io_count_;
+ return 0;
+}
+
+int
+Sender::initiate_write_stream (ACE_Message_Block &mb)
+{
+ // send the odd to the first connection, and the even to the second
+ // connection.
+
+ ACE_Message_Block *odd_mb = &mb;
+ ACE_Message_Block *even_mb = mb.cont ();
+
+ split_odd_even_chains (odd_mb, even_mb);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sender::initiate_write_stream - (ODD ) writev %d\n"),
+ odd_mb->total_length ()));
+
+ if (this->ws_[ODD].writev (*odd_mb, odd_mb->total_length ()) == -1)
+ {
+ free_chunks_chain (odd_mb);
+
+ if (even_mb)
+ free_chunks_chain (even_mb);
+
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
+ -1);
+ }
+
+ ++this->io_count_;
+
+ if (even_mb)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sender::initiate_write_stream - (EVEN) writev %d\n"),
+ even_mb->total_length ()));
+
+ if (this->ws_[EVEN].writev (*even_mb, even_mb->total_length ()) == -1)
+ {
+ free_chunks_chain (even_mb);
+
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
+ -1);
+ }
+
+ ++this->io_count_;
+ }
+
+ return 0;
+}
+
+void
+Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result)
+{
+ ACE_Message_Block *mb = &result.message_block ();
+
+ if (result.error () == 0 && result.bytes_transferred () != 0)
+ {
+ size_t bytes_transferred = result.bytes_transferred ();
+ size_t chunks_chain_size = mb->total_size ();
+
+ this->file_offset_ += bytes_transferred;
+
+ this->initiate_write_stream (*mb);
+
+ // and read more if required
+ if (bytes_transferred == chunks_chain_size)
+ this->initiate_read_file ();
+ }
+ else
+ free_chunks_chain (mb);
+
+ --this->io_count_;
+
+ this->check_destroy ();
+}
+
+void
+Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ ACE_Message_Block *mb = &result.message_block ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sender::handle_write_stream - wrote %d bytes\n"),
+ result.bytes_transferred ()));
+
+ if (result.error () == 0 && result.bytes_transferred () != 0)
+ // verify sent all
+ ACE_ASSERT (0 == mb->total_length ());
+ else
+ ACE_ASSERT (0);
+
+ free_chunks_chain (mb);
+
+ --this->io_count_;
+
+ this->check_destroy ();
+}
+
+// *************************************************************
+// Configuration helpers
+// *************************************************************
+int
+print_usage (int /* argc */, ACE_TCHAR *argv[])
+{
+ ACE_ERROR
+ ((LM_ERROR,
+ ACE_TEXT ("\nusage: %s")
+ ACE_TEXT ("\n-f <input file>\n")
+ ACE_TEXT ("\n-c client only (reader-sender)")
+ ACE_TEXT ("\n-s server only (receiver-writer)")
+ ACE_TEXT ("\n-h host to connect to")
+ ACE_TEXT ("\n-p port")
+ ACE_TEXT ("\n-u show this message")
+ ACE_TEXT ("\n"),
+ argv[0]
+ ));
+ return -1;
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ if (argc == 1) // no arguments , so one button test
+ return 0;
+
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("f:csh:p:u"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ {
+ switch (c)
+ {
+ case 'f':
+ input_file = get_opt.opt_arg ();
+ break;
+ case 'c':
+ client_only = true;
+ server_only = false;
+ break;
+ case 's':
+ server_only = true;
+ client_only = false;
+ break;
+ case 'h':
+ host = get_opt.opt_arg ();
+ break;
+ case 'p':
+ port = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'u':
+ default:
+ return print_usage (argc, argv);
+ } // switch
+ } // while
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
+
+ if (::parse_args (argc, argv) == -1)
+ return -1;
+
+ if (client_only)
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Running as client only\n")));
+ else if (server_only)
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Running as server only\n")));
+ else
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Running as server and client\n")));
+
+ chunk_size = ACE_OS::getpagesize ();
+
+ Acceptor acceptor;
+ Connector connector;
+
+ if (!server_only)
+ {
+ if (-1 == connector.open (1, ACE_Proactor::instance ()))
+ {
+ ACE_ASSERT (0);
+ return -1;
+ }
+
+ // connect to first destination
+ if (-1 == connector.connect (ACE_INET_Addr (port, host)))
+ {
+ ACE_ASSERT (0);
+ return -1;
+ }
+ }
+
+ if (!client_only)
+ {
+ // Simplify, initial read with zero size
+ if (-1 == acceptor.open (ACE_INET_Addr (port), 0, 1))
+ {
+ ACE_ASSERT (0);
+ return -1;
+ }
+ }
+
+ ACE_Proactor::instance ()->run_event_loop ();
+
+ // As Proactor event loop now is inactive it is safe to destroy all
+ // senders
+
+ connector.stop ();
+ acceptor.stop ();
+
+ // now compare the files - available only when on same machine
+
+ int success = 0;
+ if (!client_only && !server_only)
+ {
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Comparing the input file and the output file...\n")));
+
+ success = -1;
+ // map the two files, then perform memcmp
+ {
+ ACE_Mem_Map original_file (input_file);
+ ACE_Mem_Map reconstructed_file (output_file);
+
+ if (original_file.addr () &&
+ original_file.addr () != MAP_FAILED &&
+ reconstructed_file.addr () &&
+ reconstructed_file.addr () != MAP_FAILED)
+ {
+ // compare lengths
+ if ((original_file.size () == reconstructed_file.size ()) &&
+ // and if same size, compare file data
+ (0 == ACE_OS::memcmp (original_file.addr (),
+ reconstructed_file.addr (),
+ original_file.size ())))
+ success = 0;
+ }
+ }
+
+ if (0 == success)
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("input file and the output file identical!\n")));
+ else
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("input file and the output file are different!\n")));
+ }
+
+ if (!client_only)
+ ACE_OS::unlink (output_file);
+
+ if (0 != success)
+ ACE_ASSERT (0);
+
+ ACE_END_TEST;
+
+ return success;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Asynch_Acceptor<Receiver>;
+template class ACE_Asynch_Connector<Sender>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Asynch_Acceptor<Receiver>
+#pragma instantiate ACE_Asynch_Connector<Sender>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
+#else
+int
+main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
+
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Asynchronous Scatter/Gather IO is unsupported.\n")
+ ACE_TEXT ("Proactor_Scatter_Gather_Test will not be run.")));
+
+ ACE_END_TEST;
+
+ return 0;
+}
+
+#endif /* (ACE_HAS_WINNT4 && ACE_HAS_WINNT4 != 0) && !ACE_HAS_WINCE) */
+
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp
index 1a3a2c5cea4..d8c06c1423a 100644
--- a/tests/Proactor_Test.cpp
+++ b/tests/Proactor_Test.cpp
@@ -1,4 +1,4 @@
-// -*- C++ -*-
+// $Id$
// ============================================================================
/**
diff --git a/tests/run_test.lst b/tests/run_test.lst
index 99f54949131..3f8a2307353 100644
--- a/tests/run_test.lst
+++ b/tests/run_test.lst
@@ -78,6 +78,7 @@ Pipe_Test: !chorus !VxWorks
Priority_Buffer_Test
Priority_Reactor_Test: !chorus
Priority_Task_Test: !Unicos
+Proactor_Scatter_Gather_Test: !chorus !VxWorks !LynxOS
Proactor_Test: !chorus !VxWorks !LynxOS
Proactor_Timer_Test: !chorus !VxWorks !LynxOS
Process_Manager_Test: !chorus !VxWorks