summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog-98b9
-rw-r--r--ace/Asynch_IO.cpp343
-rw-r--r--ace/Asynch_IO.h131
-rw-r--r--examples/Reactor/Proactor/test_proactor.cpp8
4 files changed, 335 insertions, 156 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b
index c99d1a10962..3b18b95ff49 100644
--- a/ChangeLog-98b
+++ b/ChangeLog-98b
@@ -1,3 +1,12 @@
+Fri Sep 18 18:06:19 1998 Alexander Babu Arulanthu <alex@cs.wustl.edu>
+
+ * ace/Asynch_IO.h:
+ * ace/Asynch_IO.cpp:
+ Implemented Asynch_Accept for POSIX4 systems, by having only one
+ thread doing <handle_events>. Asynch_Accept_Handler has been
+ changed to act as Event Handler now. No problem of having threads
+ blocking on the <accept> now.
+
Fri Sep 18 13:26:50 1998 David L. Levine <levine@cs.wustl.edu>
* tests/High_Res_Timer_Test.cpp (check): raised success
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp
index 957d454079c..dbfabdc79d7 100644
--- a/ace/Asynch_IO.cpp
+++ b/ace/Asynch_IO.cpp
@@ -18,6 +18,132 @@ ACE_RCSID(ace, Asynch_IO, "$Id$")
#include "ace/Asynch_IO.i"
#endif /* __ACE_INLINE__ */
+#if defined (ACE_HAS_AIO_CALLS)
+class ACE_Export ACE_Asynch_Accept_Handler : public ACE_Event_Handler
+{
+ // = TITLE
+ // For the POSIX implementation, this class takes care of doing
+ // Asynch_Accept.
+ //
+ // = DESCRIPTION
+ //
+public:
+ ACE_Asynch_Accept_Handler (ACE_Reactor* reactor);
+ // Constructor.
+
+ ~ACE_Asynch_Accept_Handler (void);
+ // Destructor.
+
+ int register_accept_call (ACE_Asynch_Accept::Result* result);
+ // Register this <accept> call with the local handler.
+
+ virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+ // Called when accept event comes up on the <listen_handle>.
+
+private:
+ ACE_Asynch_Accept::Result* deregister_accept_call (void);
+ // Undo the things done when registering.
+
+ ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*> result_queue_;
+ // Queue of Result pointers that correspond to all the <accept>'s
+ // pending.
+
+ ACE_Reactor* reactor_;
+ // Reactor used by the Asynch_Accept. We need this here to enable
+ // and disable the <handle> now and then, depending on whether any
+ // <accept> is pending or no.
+
+ ACE_Thread_Mutex lock_;
+ // The lock to protect the result queue which is shared. The queue
+ // is updated by main thread in the register function call and
+ // through the auxillary thread in the deregister fun. So let us
+ // mutex it.
+
+ ACE_Asynch_Accept_Handler (void);
+ // Default constructor shouldn't be called. Without a reactor, this
+ // class wont make sense.
+};
+#endif /* ACE_HAS_AIO_CALLS*/
+
+#if defined (ACE_HAS_AIO_CALLS)
+class ACE_Export ACE_Asynch_Transmit_Handler : public ACE_Handler
+{
+ // = TITLE
+ // Auxillary handler for doing <Asynch_Transmit_File> in
+ // Unix. <ACE_Asynch_Transmit_File> internally uses this.
+ //
+ // = DESCRIPTION
+ // This is a helper class for implementing
+ // <ACE_Asynch_Transmit_File> in Unix systems.
+public:
+ ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result,
+ ACE_Proactor *proactor);
+ // Constructor. Result pointer will have all the information to do
+ // the file transmission (socket, file, application handler, bytes
+ // to write....) and the the <proactor> pointer tells this class
+ // the <proactor> that is being used by the
+ // Asynch_Transmit_Operation and the application.
+
+ virtual ~ACE_Asynch_Transmit_Handler (void);
+ // Destructor.
+
+ int transmit (void);
+ // Do the transmission. All the info to do the transmission is in
+ // the <result> member.
+
+protected:
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when asynchronous writes from the socket complete.
+
+ virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
+ // This is called when asynchronous reads from the file complete.
+private:
+ int initiate_read_file (void);
+ // Issue asynch read from the file.
+
+ ACE_Asynch_Transmit_File::Result *result_;
+ // The asynch result pointer made from the initial transmit file
+ // request.
+
+ ACE_Proactor *proactor_;
+ // The Proactor that is being used by the application handler and
+ // so the Asynch_Transmit_File.
+
+ ACE_Asynch_Read_File rf_;
+ // To read from the file to be transmitted.
+
+ ACE_Asynch_Write_Stream ws_;
+ // Write stream to write the header, trailer and the data.
+
+ ACE_Message_Block *mb_;
+ // Message bloack used to do the txn.
+
+ enum ACT
+ {
+ HEADER_ACT = 1,
+ DATA_ACT = 2,
+ TRAILER_ACT = 3
+ };
+
+ ACT header_act_;
+ ACT data_act_;
+ ACT trailer_act_;
+ // ACT to transmit header, data and trailer.
+
+ size_t file_offset_;
+ // Current offset of the file being transmitted.
+
+ size_t file_size_;
+ // Total size of the file.
+
+ size_t bytes_transferred_;
+ // Number of bytes transferred on the stream.
+
+ size_t transmit_file_done_;
+ // Flag to indicate that the transmitting is over.
+};
+#endif /* ACE_HAS_AIO_CALLS */
+
ACE_Asynch_Result::ACE_Asynch_Result (ACE_Handler &handler,
const void* act,
ACE_HANDLE event,
@@ -138,13 +264,16 @@ ACE_Asynch_Operation::open (ACE_Handler &handler,
}
#if defined (ACE_HAS_AIO_CALLS)
+
// AIO stuff is present. So no registering.
// @@ But something has to be done to associate completion key with
// the handle provided. How about a HashTable of size "the number of
// file descriptors that are possible in the system".
ACE_UNUSED_ARG (completion_key);
return 0;
+
#else /* ACE_HAS_AIO_CALLS */
+
// Register with the <proactor>.
return this->proactor_->register_handle (this->handle_,
completion_key);
@@ -155,8 +284,10 @@ int
ACE_Asynch_Operation::cancel (void)
{
#if defined (ACE_HAS_AIO_CALLS)
+
// @@ <aio_cancel> will come here soon.
return 0;
+
#elif (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && (defined (_MSC_VER) && (_MSC_VER > 1020))
// All I/O operations that are canceled will complete with the error
// ERROR_OPERATION_ABORTED. All completion notifications for the I/O
@@ -764,7 +895,43 @@ ACE_Asynch_Write_File::Result::complete (u_long bytes_transferred,
// ************************************************************
ACE_Asynch_Accept::ACE_Asynch_Accept (void)
+ : accept_handler_ (0)
{
+ ACE_NEW (this->accept_handler_,
+ ACE_Asynch_Accept_Handler (&this->reactor_));
+}
+
+int
+ACE_Asynch_Accept::open (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ const void *completion_key,
+ ACE_Proactor *proactor)
+{
+
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::open called\n"));
+
+ // Common things to register for any Asynch Operation. We need to
+ // call the base class' <open> method.
+ int result_open = this->ACE_Asynch_Operation::open (handler,
+ handle,
+ completion_key,
+ proactor);
+ if (result_open < 0)
+ return result_open;
+
+ // Register the handle with the reactor.
+ this->reactor_.register_handler (this->handle_,
+ this->accept_handler_,
+ ACE_Event_Handler::ACCEPT_MASK);
+
+ // Suspend the <handle> now. Enable only when the <accept> is issued
+ // by the application.
+ this->reactor_.suspend_handlers ();
+
+ // Spawn the thread. It is the only thread we are going to have. It
+ // will do the <handle_events> on the reactor.
+ ACE_Thread_Manager::instance ()->spawn (ACE_Asynch_Accept::thread_function,
+ (void *)&this->reactor_);
}
int
@@ -774,7 +941,10 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
const void *act)
{
#if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) || (defined (ACE_HAS_AIO_CALLS))
-
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::accept called\n"));
+
// Common code for both WIN and POSIX.
// Sanity check: make sure that enough space has been allocated by
@@ -807,6 +977,7 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
}
#endif /* Not ACE_HAS_AIO_CALLS */
+ // Common code for both WIN and POSIX.
Result *result = 0;
ACE_NEW_RETURN (result,
Result (*this->handler_,
@@ -821,23 +992,9 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
#if defined (ACE_HAS_AIO_CALLS)
// Code specific to the POSIX Implementation.
- // Make the auxillary class for doing accept and initiate
- // accept. This class will be delete itself when the accept succeeds
- // or some failures occur there.
- ACE_Asynch_Accept_Handler *accept_handler = 0;
-
- ACE_NEW_RETURN (accept_handler,
- ::ACE_Asynch_Accept_Handler (result),
- -1);
+ // Register this <accept> call with the local handler.
+ this->accept_handler_->register_accept_call (result);
- int return_val = accept_handler->activate (THR_NEW_LWP |
- THR_DETACHED);
- if (return_val == -1)
- // Something went wrong.
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:%p:\n",
- "Could not create thread to do Asynch_Accept"),
- -1);
return 0;
#else /* Not ACE_HAS_AIO_CALLS */
// Code specific to WIN platforms.
@@ -886,6 +1043,31 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
#endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) || (defined (ACE_HAS_AIO_CALLS) */
}
+void*
+ACE_Asynch_Accept::thread_function (void* arg_reactor)
+{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::thread_function called\n"));
+
+ // Retrieve the reactor pointer from the argument.
+ ACE_Reactor* reactor = (ACE_Reactor *) arg_reactor;
+
+ // For this reactor, this thread is the owner.
+ reactor->owner (ACE_OS::thr_self ());
+
+ // Handle events.
+ int result = 0;
+ while (result != -1)
+ {
+ result = reactor->handle_events ();
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Accept::Thread_Function : handle_events : result = [%d]\n",
+ result));
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Exiting ACE_Asynch_Accept::thread_function \n"));
+ return 0;
+}
+
// ************************************************************
u_long
@@ -949,22 +1131,86 @@ ACE_Asynch_Accept::Result::complete (u_long bytes_transferred,
// ************************************************************
#if defined (ACE_HAS_AIO_CALLS)
-ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (ACE_Asynch_Accept::Result *result)
- : result_ (result)
+ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (ACE_Reactor* reactor)
+ : reactor_ (reactor)
{
}
ACE_Asynch_Accept_Handler::~ACE_Asynch_Accept_Handler (void)
{
- // We shouldnt delete the result_ inside, because the completion is
- // going thru the proactor's handle_events, it will delete the
- // result after the calling the application specific code.
+}
+
+ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (void)
+{
+}
+
+int
+ACE_Asynch_Accept_Handler::register_accept_call (ACE_Asynch_Accept::Result* result)
+{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::register_accept_call called\n"));
+
+ // The queue is updated by main thread in the register function call and
+ // thru the auxillary thread in the deregister fun. So let us mutex
+ // it.
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+
+ // Insert this result to the queue.
+ int insert_result = this->result_queue_.enqueue_tail (result);
+ if (insert_result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:ACE_Asynch_Accept_Handler::register_accept_call failed\n"),
+ -1);
+
+ // If this is the only item, then it means there the set was empty
+ // before. So enable the <handle> in the reactor.
+ if (this->result_queue_.size () == 1)
+ this->reactor_->resume_handlers ();
+}
+
+ACE_Asynch_Accept::Result*
+ACE_Asynch_Accept_Handler::deregister_accept_call (void)
+{
+ // The queue is updated by main thread in the register function call and
+ // thru the auxillary thread in the deregister fun. So let us mutex
+ // it.
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0);
+
+ // Get the first item (result ptr) from the Queue.
+ ACE_Asynch_Accept::Result* result = 0;
+ int return_dequeue = this->result_queue_.dequeue_head (result);
+ if (return_dequeue == -1)
+ return 0;
+ // Sanity check.
+ if (result == 0)
+ return 0;
+
+ // Disable the <handle> in the reactor if no <accept>'s are
+ // pending.
+ if (this->result_queue_.size () == 0)
+ this->reactor_->suspend_handlers ();
+
+ // Return the result pointer.
+ return result;
}
int
-ACE_Asynch_Accept_Handler::svc (void)
+ACE_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd)
{
- // @@ Can this wait forever for accept?
+ // An <accept> has been sensed on the <listen_handle>. We should be
+ // able to just go ahead and do the <accept> now on this <fd>. This
+ // should be the same as the <listen_handle>.
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::handle_input called\n"));
+
+ // Deregister this info pertaining to this <accept> call.
+ ACE_Asynch_Accept::Result* result = this->deregister_accept_call ();
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t):ACE_Asynch_Accept_Handler::handle_input : fd = [%d], Result->listen_handle = [%d]\n",
+ fd,
+ result->listen_handle ()));
// There is not going to be any data read. So we can use the
// <message_block> itself to take the <remote_address> as well as
@@ -974,20 +1220,17 @@ ACE_Asynch_Accept_Handler::svc (void)
size_t buffer_size = sizeof (sockaddr_in) + sizeof (sockaddr);
// Parameters for the <accept> call.
- int *address_size = (int *)this->result_->message_block ().wr_ptr ();
+ int *address_size = (int *)result->message_block ().wr_ptr ();
*address_size = buffer_size;
// Increment the wr_ptr.
- this->result_->message_block ().wr_ptr (sizeof (int));
-
- // @@ Debugging.
- ACE_DEBUG ((LM_DEBUG,
- "Asynch_Accept_Handler::svc : Listen_handle = [%d]\n",
- this->result_->listen_handle ()));
+ result->message_block ().wr_ptr (sizeof (int));
// Issue <accept> now.
- ACE_HANDLE new_handle = ACE_OS::accept (this->result_->listen_handle (),
- (struct sockaddr *) this->result_->message_block ().wr_ptr (),
+ // @@ We shouldnt block here since we have already done poll/select
+ // thru reactor. But are we sure?
+ ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (),
+ (struct sockaddr *) result->message_block ().wr_ptr (),
address_size);
if (new_handle == ACE_INVALID_HANDLE)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -998,13 +1241,15 @@ ACE_Asynch_Accept_Handler::svc (void)
// Accept has completed.
// Update the <wr_ptr> for the <message block>.
- this->result_->message_block ().wr_ptr (*address_size);
+ result->message_block ().wr_ptr (*address_size);
// @@ Just debugging.
- ACE_DEBUG ((LM_DEBUG, "%N:%l:Address_size = [%d], New_handle = [%d]\n", *address_size, new_handle));
+ ACE_DEBUG ((LM_DEBUG,
+ "%N:%l:Address_size = [%d], New_handle = [%d]\n",
+ *address_size, new_handle));
// Store the new handle.
- this->result_->accept_handle_ = new_handle;
+ result->accept_handle_ = new_handle;
// Signal the main process about this completion.
@@ -1012,27 +1257,23 @@ ACE_Asynch_Accept_Handler::svc (void)
pid_t pid = ACE_OS::getpid ();
if (pid == (pid_t) -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "Error:(%P | %t):%p:<getpid> failed.\n"),
+ "Error:(%P | %t):%p\n",
+ "<getpid> failed."),
-1);
// Set the signal information.
sigval value;
- value.sival_ptr = (void *) this->result_;
+ value.sival_ptr = (void *) result;
// Queue the signal.
if (sigqueue (pid, ACE_SIG_AIO, value) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Error:(%P | %t):%p:\n",
- "<sigqueue> failed.\n"),
+ "<sigqueue> failed"),
-1);
- // @@ Just debugging.
- ACE_DEBUG ((LM_DEBUG,
- "(%P | %t): Accept is done. Signal is queued. Exiting thread\n"));
-
return 0;
}
-
#endif /* ACE_HAS_AIO_CALLS */
// ************************************************************
@@ -1718,4 +1959,18 @@ ACE_Service_Handler::open (ACE_HANDLE,
{
}
+#if defined (ACE_HAS_AIO_CALLS)
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*>;
+template class ACE_Node<ACE_Asynch_Accept::Result*>;
+template class ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*>;
+template class ACE_Unbounded_Queue_Iterator<ACE_Asynch_Accept::Result*>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*>
+#pragma instantiate ACE_Node<ACE_Asynch_Accept::Result*>
+#pragma instantiate ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*>
+#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_Asynch_Accept::Result*>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+#endif /* ACE_HAS_AIO_CALLS */
+
#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/
diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h
index b6f61d0cd87..3e1cddd7d89 100644
--- a/ace/Asynch_IO.h
+++ b/ace/Asynch_IO.h
@@ -30,9 +30,10 @@
#include "ace/OS.h"
#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \
- (defined (ACE_HAS_AIO_CALLS))
+ (defined (ACE_HAS_AIO_CALLS))
#include "ace/Task.h"
+#include "ace/Reactor.h"
// Forward declarations
class ACE_Proactor;
@@ -498,6 +499,7 @@ public:
};
};
+
class ACE_Export ACE_Asynch_Accept : public ACE_Asynch_Operation
{
// = TITLE
@@ -514,6 +516,14 @@ public:
ACE_Asynch_Accept (void);
// A do nothing constructor.
+#if defined (ACE_HAS_AIO_CALLS)
+ int open (ACE_Handler &handler,
+ ACE_HANDLE handle = ACE_INVALID_HANDLE,
+ const void *completion_key = 0,
+ ACE_Proactor *proactor = 0);
+ // (We will call base class's <open> from here).
+#endif /* ACE_HAS_AIO_CALLS */
+
int accept (ACE_Message_Block &message_block,
u_long bytes_to_read,
ACE_HANDLE accept_handle = ACE_INVALID_HANDLE,
@@ -545,7 +555,7 @@ public:
#if defined (ACE_HAS_AIO_CALLS)
friend class ACE_Asynch_Accept_Handler;
- // The helper factory has oprivilages too.
+ // This factory does it all, so it needs spl privileges.
#endif /* ACE_HAS_AIO_CALLS */
u_long bytes_to_read (void) const;
@@ -594,36 +604,19 @@ public:
ACE_HANDLE accept_handle_;
// I/O handle for the new connection.
};
-};
+private:
#if defined (ACE_HAS_AIO_CALLS)
-class ACE_Export ACE_Asynch_Accept_Handler : public ACE_Task <ACE_NULL_SYNCH>
-{
- // = TITLE
- // For the POSIX implementation, this class takes care of doing
- // Asynch_Accept.
- //
- // = DESCRIPTION
- //
-public:
- ACE_Asynch_Accept_Handler (ACE_Asynch_Accept::Result *result);
- // Constructor.
-
- ~ACE_Asynch_Accept_Handler (void);
- // Destructor.
-
-protected:
- virtual int svc (void);
- // Run by a daemon thread to handle deferred processing. This method
- // will be doing the accept actually.
-
- ACE_Asynch_Accept::Result *result_;
- // The result pointer given by <ACE_Asynch_Transmit> class.
-
- int shutting_down_;
- // Flag used to indicate when we are shutting down.
+ static void* thread_function (void* reactor);
+ // The thread function that does handle events
+
+ ACE_Reactor reactor_;
+ // Reactor to wait on the <listen_handle>.
+
+ ACE_Asynch_Accept_Handler* accept_handler_;
+ // The Event Handler to do handle_input.
+#endif /* ACE_HAS_AIO_CALLS */
};
-#endif /* ACE_HAS_AIO_CALLS*/
class ACE_Export ACE_Asynch_Transmit_File : public ACE_Asynch_Operation
{
@@ -912,86 +905,6 @@ public:
// connections.
};
-#if defined (ACE_HAS_AIO_CALLS)
-class ACE_Export ACE_Asynch_Transmit_Handler : public ACE_Handler
-{
- // = TITLE
- // Auxillary handler for doing <Asynch_Transmit_File> in
- // Unix. <ACE_Asynch_Transmit_File> internally uses this.
- //
- // = DESCRIPTION
- // This is a helper class for implementing
- // <ACE_Asynch_Transmit_File> in Unix systems.
-public:
- ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result,
- ACE_Proactor *proactor);
- // Constructor. Result pointer will have all the information to do
- // the file transmission (socket, file, application handler, bytes
- // to write....) and the the <proactor> pointer tells this class
- // the <proactor> that is being used by the
- // Asynch_Transmit_Operation and the application.
-
- virtual ~ACE_Asynch_Transmit_Handler (void);
- // Destructor.
-
- int transmit (void);
- // Do the transmission. All the info to do the transmission is in
- // the <result> member.
-
-protected:
- virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
- // This is called when asynchronous writes from the socket complete.
-
- virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
- // This is called when asynchronous reads from the file complete.
-private:
- int initiate_read_file (void);
- // Issue asynch read from the file.
-
- ACE_Asynch_Transmit_File::Result *result_;
- // The asynch result pointer made from the initial transmit file
- // request.
-
- ACE_Proactor *proactor_;
- // The Proactor that is being used by the application handler and
- // so the Asynch_Transmit_File.
-
- ACE_Asynch_Read_File rf_;
- // To read from the file to be transmitted.
-
- ACE_Asynch_Write_Stream ws_;
- // Write stream to write the header, trailer and the data.
-
- ACE_Message_Block *mb_;
- // Message bloack used to do the txn.
-
- enum ACT
- {
- HEADER_ACT = 1,
- DATA_ACT = 2,
- TRAILER_ACT = 3
- };
-
- ACT header_act_;
- ACT data_act_;
- ACT trailer_act_;
- // ACT to transmit header, data and trailer.
-
- size_t file_offset_;
- // Current offset of the file being transmitted.
-
- size_t file_size_;
- // Total size of the file.
-
- size_t bytes_transferred_;
- // Number of bytes transferred on the stream.
-
- size_t transmit_file_done_;
- // Flag to indicate that the transmitting is over.
-};
-
-#endif /* ACE_HAS_AIO_CALLS */
-
#if defined (__ACE_INLINE__)
#include "ace/Asynch_IO.i"
#endif /* __ACE_INLINE__ */
diff --git a/examples/Reactor/Proactor/test_proactor.cpp b/examples/Reactor/Proactor/test_proactor.cpp
index 5807b795d36..04f611ba5d3 100644
--- a/examples/Reactor/Proactor/test_proactor.cpp
+++ b/examples/Reactor/Proactor/test_proactor.cpp
@@ -115,7 +115,9 @@ Receiver::open (ACE_HANDLE handle,
this->file_offset_ = 0;
// Open dump file (in OVERLAPPED mode)
- this->dump_file_ = ACE_OS::open (dump_file, O_CREAT | O_RDWR | O_TRUNC | FILE_FLAG_OVERLAPPED);
+ this->dump_file_ = ACE_OS::open (dump_file,
+ O_CREAT | O_RDWR | O_TRUNC | FILE_FLAG_OVERLAPPED,
+ 0644);
if (this->dump_file_ == ACE_INVALID_HANDLE)
{
ACE_ERROR ((LM_ERROR, "%p\n", "ACE_OS::open"));
@@ -314,7 +316,7 @@ Sender::Sender (void)
transmit_file_done_ (0)
{
// Moment of inspiration :-)
- static char *data = "Welcome to Irfan World! Irfan RULES here !!";
+ static char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
this->welcome_message_.init (data, ACE_OS::strlen (data));
this->welcome_message_.wr_ptr (ACE_OS::strlen (data));
}
@@ -386,7 +388,7 @@ Sender::transmit_file (void)
// Header and trailer data for the file
this->header_and_trailer_.header_and_trailer (&this->welcome_message_,
this->welcome_message_.length (),
- &this->welcome_message_,
+ this->welcome_message_.duplicate (),
this->welcome_message_.length ());
// Starting position