summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-17 16:35:06 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-17 16:35:06 +0000
commit0d7a4c90918697451fc1cd7ebb39102aa162b0f5 (patch)
treef8543faeadf492ba2fec99ef62e289382d9f97ea
parentff8c24a33730abc4e8703aab66a17c3d333541af (diff)
downloadATCD-0d7a4c90918697451fc1cd7ebb39102aa162b0f5.tar.gz
foo
-rw-r--r--ChangeLog-96b80
-rw-r--r--ace/Local_Name_Space_T.cpp4
-rw-r--r--ace/Memory_Pool.cpp2
-rw-r--r--ace/OS.cpp9
-rw-r--r--ace/OS.h3
-rw-r--r--ace/OS.i26
-rw-r--r--ace/ReactorEx.cpp105
-rw-r--r--ace/ReactorEx.h53
-rw-r--r--ace/Task.cpp2
-rw-r--r--ace/Task_T.i5
-rw-r--r--examples/Reactor/ReactorEx/test_reactorEx.cpp74
-rw-r--r--examples/Reactor/WFMO_Reactor/test_reactorEx.cpp74
12 files changed, 294 insertions, 143 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b
index 048e6151f7e..d867ad7a964 100644
--- a/ChangeLog-96b
+++ b/ChangeLog-96b
@@ -1,3 +1,82 @@
+<<<<<<< ChangeLog-96b
+Tue Dec 17 04:27:07 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace/ReactorEx: Added a new feature to the ReactorEx. If we
+ enable the wait_all flag when calling
+ ACE_ReactorEx::handle_events() *and* we give an
+ ACE_Event_Handler (this is a new final param to the call) then
+ the handle_signal() call will be invoked on this
+ "wait_all_callback" object when all the handles become signaled.
+ Moreover, we pass in the array of signaled handled to through
+ the siginfo_t parameter (see the following ChangeLog entry for
+ details). If there is no wait_all_callback param, then all the
+ handle_signal() methods are invoked on all the handles.
+
+ * ace/OS.h (siginfo_t): Augmented the siginfo_t interface so that
+ we can pass an array of signaled Win32 HANDLEs, in addition to
+ just a single HANDLE. This is used in the ReactorEx.
+
+ * examples/Reactor/ReactorEx/test_reactorEx.cpp: Added a number of
+ enhancements to this test program based on discussions with
+ Irfan, Karlheinz, Dieter, and Detlef.
+
+ * ace/Task_T.i (msg_queue): If we override the existing definition
+ of the Message_Queue in an ACE_Task then we need to delete the
+ existing Message_queue (if necessary and mark the Message_Queue
+ as no longer being a candidate for deletion (since we have
+ supplied our own definition). Irfan had added this earlier, but
+ it seemed to get lost...
+
+ * examples/Reactor/Proactor/test_proactor.cpp: The class called
+ STDIN_HANDLEr is misnamed since we don't read from stdin, we
+ read from a file. Therefore, I've changed this to be
+ Input_File_Handler.
+
+ * examples/Reactor/ReactorEx/test_{proactor,reactorEx}.cpp:
+ Changed misspellings of transfered to transferred.
+
+ * ace/Memory_Pool.cpp (ACE_MMAP_Memory_Pool): Since NT doesn't support
+ SIGSEGV thre's no point in even trying to register for this
+ signal!
+
+ * ace/OS.i: Reverted some lost UNICODE fixes -- thanks to Irfan
+ for finding these.
+
+ * ace/Local_Name_Space_T.cpp (create_manager_i): Removed a
+ debug statement since it may be causing problems with printing
+ UNICODE.
+
+Mon Dec 16 11:25:55 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace/OS.i (cuserid): Fixed the definition to ACE_OS::cuserid() so
+ that it uses LPTSTR. Thanks to Irfan for this fix.
+
+ * ace/Task.cpp (activate): In ACE_Task::activate() there is a possibility to actually
+ "reactivate" the task using the <force_activate> flag. The following
+ illustrates that ability:
+
+ if (this->thr_count_ > 0 && force_active == 0)
+ return 1; // Already active.
+ else
+ this->thr_count_ = n_threads;
+
+ The thing is that, when the task is running and we reactivate it
+ (actually we add threads) the command should be:
+
+ this->thr_count_ += n_threads;
+
+ rather than
+
+ this->thr_count_ = n_threads;
+
+ That way <this->thr_count_> holds the new number of threads currently
+ associated with the task. Thanks to Hamual for this fix.
+
+ * ace/OS.i (inet_aton): Placed the return 1 within the curly
+ braces to make the HP/UX compiler happy. Thanks to Kenny Want
+ for reporting this.
+
+=======
Mon Dec 16 12:56:43 1996 David L. Levine <levine@cs.wustl.edu>
* ace/OS.i: removed spurious "*/" after an #endif. Thanks to
@@ -44,6 +123,7 @@ Sun Dec 15 13:01:17 1996 David L. Levine <levine@cs.wustl.edu>
which causes a core dump with the g++/SunOS5.5 build if the string
is in the text segment.
+>>>>>>> 4.110
Sun Dec 15 10:29:20 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
* netsvcs/servers/svc.conf: Removed the "lib" prefix for the
diff --git a/ace/Local_Name_Space_T.cpp b/ace/Local_Name_Space_T.cpp
index d89cf054b81..0f6c66f651b 100644
--- a/ace/Local_Name_Space_T.cpp
+++ b/ace/Local_Name_Space_T.cpp
@@ -378,8 +378,7 @@ ACE_Local_Name_Space<ACE_MEM_POOL_2, LOCK>::create_manager_i (void)
ACE_OS::strcat (this->context_file_, ACE_DIRECTORY_SEPARATOR_STR);
ACE_OS::strcat (this->context_file_, this->name_options_->database ());
- ACE_DEBUG ((LM_DEBUG, "contextfile is %s\n",
- this->context_file_));
+ // ACE_DEBUG ((LM_DEBUG, "contextfile is %s\n", this->context_file_));
ACE_MEM_POOL_OPTIONS options (this->name_options_->base_address ());
@@ -437,7 +436,6 @@ ACE_Local_Name_Space<ACE_MEM_POOL_2, LOCK>::create_manager_i (void)
return 0;
}
-
template <ACE_MEM_POOL_1, class LOCK> int
ACE_Local_Name_Space<ACE_MEM_POOL_2, LOCK>::list_names (ACE_PWSTRING_SET &set,
const ACE_WString &pattern)
diff --git a/ace/Memory_Pool.cpp b/ace/Memory_Pool.cpp
index 5f671db8c75..22acf5d3809 100644
--- a/ace/Memory_Pool.cpp
+++ b/ace/Memory_Pool.cpp
@@ -128,8 +128,10 @@ ACE_MMAP_Memory_Pool::ACE_MMAP_Memory_Pool (LPCTSTR backing_store_name,
ACE_OS::strncpy (this->backing_store_name_, backing_store_name,
sizeof this->backing_store_name_);
+#if !defined (ACE_WIN32)
if (this->signal_handler_.register_handler (SIGSEGV, this) == -1)
ACE_ERROR ((LM_ERROR, "%p\n", this->backing_store_name_));
+#endif /* ACE_WIN32 */
}
// Compute the new file_offset of the backing store and commit the
diff --git a/ace/OS.cpp b/ace/OS.cpp
index e0b395e66a1..f79760c1762 100644
--- a/ace/OS.cpp
+++ b/ace/OS.cpp
@@ -1443,7 +1443,14 @@ spa (FUNCPTR entry, ...)
#if !defined (ACE_HAS_SIGINFO_T)
siginfo_t::siginfo_t (ACE_HANDLE handle)
- : si_handle_ (handle)
+ : si_handle_ (handle),
+ si_handles_ (&handle)
+{
+}
+
+siginfo_t::siginfo_t (ACE_HANDLE *handles)
+ : si_handle_ (handles[0]),
+ si_handles_ (handles)
{
}
#endif /* ACE_HAS_SIGINFO_T */
diff --git a/ace/OS.h b/ace/OS.h
index cd6259ad16c..759ab8617e6 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -1827,6 +1827,9 @@ struct ACE_Export siginfo_t
ACE_HANDLE si_handle_;
// Win32 HANDLE that has become signaled.
+
+ ACE_HANDLE *si_handles_;
+ // Array of Win32 HANDLEs all of which have become signaled.
};
#endif /* ACE_HAS_SIGINFO_T */
diff --git a/ace/OS.i b/ace/OS.i
index eb2d3b315e8..3be76c3cd98 100644
--- a/ace/OS.i
+++ b/ace/OS.i
@@ -666,8 +666,8 @@ ACE_OS::unlink (const char *path)
#endif /* VXWORKS */
}
-ACE_INLINE char *
-ACE_OS::cuserid (char *user, size_t maxlen)
+ACE_INLINE LPTSTR
+ACE_OS::cuserid (LPTSTR user, size_t maxlen)
{
// ACE_TRACE ("ACE_OS::cuserid");
#if defined (VXWORKS)
@@ -2675,8 +2675,10 @@ ACE_OS::inet_aton (const char *host_name, struct in_addr *addr)
&& ACE_OS::strcmp (host_name, "255.255.255.255") != 0)
return 0;
else if (addr != 0)
- ACE_OS::memcpy ((void *) addr, (void *) &ip_addr, sizeof ip_addr);
- return 1;
+ {
+ ACE_OS::memcpy ((void *) addr, (void *) &ip_addr, sizeof ip_addr);
+ return 1;
+ }
}
ACE_INLINE char *
@@ -4195,7 +4197,7 @@ ACE_OS::hostname (char name[], size_t maxnamelen)
{
// ACE_TRACE ("ACE_OS::uname");
#if defined (ACE_WIN32)
- ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::GetComputerName (name, LPDWORD (&maxnamelen)),
+ ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::GetComputerNameA (name, LPDWORD (&maxnamelen)),
ace_result_), int, -1);
#else /* !ACE_WIN32 */
struct utsname host_info;
@@ -4353,7 +4355,7 @@ ACE_OS::dlopen (ACE_DL_TYPE filename, int mode)
#elif defined (ACE_WIN32)
ACE_UNUSED_ARG(mode);
- ACE_OSCALL_RETURN (::LoadLibrary (filename), void *, 0);
+ ACE_OSCALL_RETURN (::LoadLibraryA (filename), void *, 0);
#else
ACE_NOTSUP_RETURN (0);
#endif /* ACE_HAS_SVR4_DYNAMIC_LINKING */
@@ -4860,7 +4862,7 @@ ACE_OS::shmget (key_t key, int size, int flags)
}
ACE_INLINE ACE_HANDLE
-ACE_OS::open (LPCTSTR filename,
+ACE_OS::open (const char *filename,
int mode,
int perms)
{
@@ -4890,11 +4892,11 @@ ACE_OS::open (LPCTSTR filename,
if (ACE_BIT_ENABLED (mode, _O_TEMPORARY))
flags |= FILE_FLAG_DELETE_ON_CLOSE;
- ACE_HANDLE h = ::CreateFile (filename, access,
- FILE_SHARE_READ | FILE_SHARE_WRITE,
- 0, creation,
- flags,
- 0);
+ ACE_HANDLE h = ::CreateFileA (filename, access,
+ FILE_SHARE_READ | FILE_SHARE_WRITE,
+ 0, creation,
+ flags,
+ 0);
if (h == ACE_INVALID_HANDLE)
{
diff --git a/ace/ReactorEx.cpp b/ace/ReactorEx.cpp
index 9801021e9a9..fb2969f69d2 100644
--- a/ace/ReactorEx.cpp
+++ b/ace/ReactorEx.cpp
@@ -123,7 +123,8 @@ ACE_ReactorEx::schedule_timer (ACE_Event_Handler *handler,
// how_long expired, and 1 if events were dispatched.
int
ACE_ReactorEx::handle_events (ACE_Time_Value *how_long,
- int wait_all)
+ int wait_all,
+ ACE_Event_Handler *wait_all_callback)
{
ACE_TRACE ("ACE_ReactorEx::handle_events");
@@ -169,61 +170,87 @@ ACE_ReactorEx::handle_events (ACE_Time_Value *how_long,
errno = ETIME;
return 0;
case WAIT_ABANDONED_0:
- // We'll let dispatch_all worry about abandoned mutexes.
+ // We'll let dispatch worry about abandoned mutexes.
default: // Dispatch.
- return this->dispatch_all (wait_status - WAIT_OBJECT_0, wait_all);
+ if (wait_all != 0)
+ return this->dispatch (wait_all_callback);
+ else
+ return this->dispatch (wait_status - WAIT_OBJECT_0);
+ }
+}
+
+int
+ACE_ReactorEx::dispatch (ACE_Event_Handler *wait_all_callback)
+{
+ if (wait_all_callback != 0)
+ {
+ siginfo_t handles (this->handles_);
+ if (wait_call_callback->handle_signal (0, &handles) == -1)
+ // Tim, what should happen if this call fails? Should all of
+ // the handles be removed?
+ return -1;
+ }
+ else
+ {
+ int result = 0;
+
+ for (int i = 0; i < this->active_handles_; i++)
+ if (this->dispatch_handler (i) == -1)
+ result = -1;
+
+ // Tim, if a result is != 0 should it contain a single -1, or
+ // perhaps the number of bad handler dispatches (negated, of
+ // course!).
+ return result;
}
}
// Dispatches any active handles from handles_[-index-] to
// handles_[active_handles_] using WaitForMultipleObjects to poll
// through our handle set looking for active handles.
+
int
-ACE_ReactorEx::dispatch_all (size_t index, int wait_all)
+ACE_ReactorEx::dispatch (size_t index)
{
- while (1)
+ while (index < active_handles_)
{
- if (this->dispatch_handler (index) == 0)
- index++;
-
- // Check if we're all out of handles.
- if (index == active_handles_)
- return 0;
-
- // If wait_all is TRUE, then we know that every handle is active
- // and there's no need to call WaitForMultipleObjects; We just
- // iterate through and dispatch each handler.
- if (wait_all == 0)
+ // Tim, if this call fails is there really anything we
+ // can/should do about it? It seems that regardless of the
+ // success or failure, we should increment the index since
+ // otherwise we might just iterate endlessly!
+ this->dispatch_handler (index);
+ index++;
+
+ DWORD wait_status =
+ ::WaitForMultipleObjects (active_handles_ - index,
+ &handles_[index],
+ FALSE, 0); // We're polling.
+
+ switch (wait_status)
{
- DWORD wait_status =
- ::WaitForMultipleObjects (active_handles_ - index,
- &(handles_[index]),
- FALSE, 0); // We're polling.
-
- switch (wait_status)
- {
- case WAIT_FAILED: // Failure.
- errno = ::GetLastError ();
- return -1;
- case WAIT_TIMEOUT:
- // There are no more handles ready, we can return.
- return 0;
- default: // Dispatch.
- // Check if a handle successfully became signaled.
- if ((wait_status >= WAIT_OBJECT_0) &&
- (wait_status < WAIT_OBJECT_0 + active_handles_))
- index += (wait_status - WAIT_OBJECT_0);
- else
- // Otherwise, a handle was abandoned.
- index += (wait_status - WAIT_ABANDONED_0);
- }
+ case WAIT_FAILED: // Failure.
+ errno = ::GetLastError ();
+ return -1;
+ case WAIT_TIMEOUT:
+ // There are no more handles ready, we can return.
+ return 0;
+ default: // Dispatch.
+ // Check if a handle successfully became signaled.
+ if (wait_status >= WAIT_OBJECT_0 &&
+ wait_status < WAIT_OBJECT_0 + active_handles_)
+ index += wait_status - WAIT_OBJECT_0;
+ else
+ // Otherwise, a handle was abandoned.
+ index += waitstatus - WAIT_ABANDONED_0;
}
}
-}
+ return 0;
+}
// Dispatches a single handler. Returns 0 on success, -1 if the
// handler was removed.
+
int
ACE_ReactorEx::dispatch_handler (int index)
{
diff --git a/ace/ReactorEx.h b/ace/ReactorEx.h
index e51f9edfac3..6c4ec881b8a 100644
--- a/ace/ReactorEx.h
+++ b/ace/ReactorEx.h
@@ -130,25 +130,30 @@ public:
virtual ~ACE_ReactorEx (void);
// Close down the ReactorEx and release all of its resources.
- // = Event loop drivers.
- // Main event loop driver that blocks for -how_long- before
- // returning (will return earlier if I/O or signal events occur).
- // Note that -how_long- can be 0, in which case this method blocks
- // until I/O events or signals occur. Returns 0 if timed out, 1 if
- // an event occurred, and -1 if an error occured. -how_long- is
- // decremented to reflect how much time the call to handle_events
- // took. For instance, if a time value of 3 seconds is passed to
- // handle_events and an event occurs after 2 seconds, -how_long-
- // will equal 1 second. This can be used if an application wishes
- // to handle events for some fixed amount of time. If wait_all is
- // TRUE, then handle_events will only dispatch the handlers if *all*
- // handles become active. If a timeout occurs, then no handlers
- // will be dispatched.
+ // = Event loop drivers. Main event loop driver that blocks for
+ // -how_long- before returning (will return earlier if I/O or signal
+ // events occur). Note that -how_long- can be 0, in which case this
+ // method blocks until I/O events or signals occur. Returns 0 if
+ // timed out, 1 if an event occurred, and -1 if an error occured.
+ // -how_long- is decremented to reflect how much time the call to
+ // handle_events took. For instance, if a time value of 3 seconds
+ // is passed to handle_events and an event occurs after 2 seconds,
+ // -how_long- will equal 1 second. This can be used if an
+ // application wishes to handle events for some fixed amount of
+ // time. If wait_all is TRUE, then handle_events will only dispatch
+ // the handlers if *all* handles become active. If a timeout
+ // occurs, then no handlers will be dispatched. If
+ // <wait_all_callback> is NULL then we dispatch the <handle_signal>
+ // method on each and every HANDLE in the dispatch array.
+ // Otherwise, we just call back the <handle_signal> method of the
+ // <wait_all_callback> object, after first assigning the siginfo_t
+ // <si_handles_> argument to point to the array of signaled handles.
virtual int handle_events (ACE_Time_Value *how_long = 0,
- int wait_all = 0);
+ int wait_all = 0,
+ ACE_Event_Handler *wait_all_callback = 0);
virtual int handle_events (ACE_Time_Value &how_long,
- int wait_all = 0);
-
+ int wait_all = 0,
+ ACE_Event_Handler *wait_all_callback = 0);
// = Register and remove Handlers.
virtual int register_handler (ACE_Event_Handler *eh,
@@ -204,11 +209,21 @@ public:
// Declare the dynamic allocation hooks.
protected:
- int dispatch_all (size_t index, int wait_all);
+ int dispatch (size_t index);
// Dispatches any active handles from handles_[-index-] to
- // handles_[active_handles_] using WaitForMultipleObjects to poll
+ // handles_[active_handles_] using <WaitForMultipleObjects> to poll
// through our handle set looking for active handles.
+ int dispatch (ACE_Event_Handler *wait_all_callback);
+ // This is called when the user called handle_events() with the
+ // <wait_all> parameter enabled. In this case, all the handlers are
+ // now signaled. If <wait_all_callback> is NULL then we dispatch
+ // the <handle_signal> method on each and every HANDLE in the
+ // dispatch array. Otherwise, we just call back the <handle_signal>
+ // method of the <wait_all_callback> object, after first assigning
+ // the siginfo_t <si_handles_> argument to point to the array of
+ // signaled handles.
+
int dispatch_handler (int index);
// Dispatches a single handler. Returns 0 on success, -1 if the
// handler was removed.
diff --git a/ace/Task.cpp b/ace/Task.cpp
index 8152f66fe9d..3046b40a9c3 100644
--- a/ace/Task.cpp
+++ b/ace/Task.cpp
@@ -175,7 +175,7 @@ ACE_Task_Base::activate (long flags,
if (this->thr_count_ > 0 && force_active == 0)
return 1; // Already active.
else
- this->thr_count_ = n_threads;
+ this->thr_count_ += n_threads;
// Use the ACE_Thread_Manager singleton if we're running as an
// active object and the caller didn't supply us with a
diff --git a/ace/Task_T.i b/ace/Task_T.i
index 9f5ba06054e..1a8837f24a3 100644
--- a/ace/Task_T.i
+++ b/ace/Task_T.i
@@ -57,6 +57,11 @@ template <ACE_SYNCH_1> ACE_INLINE void
ACE_Task<ACE_SYNCH_2>::msg_queue (ACE_Message_Queue<ACE_SYNCH_2> *mq)
{
ACE_TRACE ("ACE_Task<ACE_SYNCH_2>::msg_queue");
+ if (this->delete_msg_queue_)
+ {
+ delete this->msg_queue_;
+ this->delete_msg_queue_ = 0;
+ }
this->msg_queue_ = mq;
}
diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp
index 488fa1e683a..295b36ffda0 100644
--- a/examples/Reactor/ReactorEx/test_reactorEx.cpp
+++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp
@@ -34,7 +34,6 @@
#include "ace/Service_Config.h"
#include "ace/Synch.h"
#include "ace/Task.h"
-#include "ace/OS.h"
typedef ACE_Task<ACE_MT_SYNCH> MT_TASK;
@@ -44,6 +43,7 @@ class Peer_Handler : public MT_TASK
// and forward them to the server using proactive I/O.
{
public:
+ // = Initialization methods.
Peer_Handler (int argc, char *argv[]);
~Peer_Handler (void);
@@ -53,12 +53,12 @@ public:
// hostname was specified from the command line.
virtual int handle_output_complete (ACE_Message_Block *msg,
- long bytes_transfered);
+ long bytes_transferred);
// One of our asynchronous writes to the remote peer has completed.
// Make sure it succeeded and then delete the message.
virtual int handle_input_complete (ACE_Message_Block *msg,
- long bytes_transfered);
+ long bytes_transferred);
// The remote peer has sent us something. If it succeeded, print
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
@@ -73,14 +73,16 @@ public:
// We've been removed from the ReactorEx.
virtual int handle_output (ACE_HANDLE fd);
- // Called when output events should start
+ // Called when output events should start. Note that this is
+ // automatically invoked by the
+ // <ACE_ReactorEx_Notificiation_Strategy>.
private:
ACE_SOCK_Stream stream_;
// Socket that we have connected to the server.
ACE_ReactorEx_Notification_Strategy strategy_;
- // strategy created such that the reactorEx notifies us when
+ // The strategy object that the reactorEx uses to notify us when
// something is added to the queue.
// = Remote peer info.
@@ -89,9 +91,6 @@ private:
u_short port_;
// Port number for remote host.
-
- // = Make Task happy.
- int close (u_long) { return 0; }
};
class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH>
@@ -101,6 +100,7 @@ class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH>
{
public:
STDIN_Handler (MT_TASK &ph);
+ // Initialization.
virtual int open (void * = 0);
// Activate object.
@@ -109,17 +109,12 @@ public:
// Shut down.
int svc (void);
- // Thread runs here.
+ // Thread runs here as an active object.
private:
- MT_TASK &ph_;
- // Send all input to ph_.
-
static void handler (int signum);
- // Handle a ^C. (Do nothing).
-
- int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
- // Make Task happy.
+ // Handle a ^C. (Do nothing, this just illustrates how we can catch
+ // signals along with the other things).
void register_thread_exit_hook (void);
// Helper function to register with the ReactorEx for thread exit.
@@ -127,6 +122,12 @@ private:
virtual int handle_signal (int index, siginfo_t *, ucontext_t *);
// The STDIN thread has exited. This means the user hit ^C. We can
// end the event loop.
+
+ MT_TASK &ph_;
+ // Send all input to ph_.
+
+ ACE_HANDLE thr_handle_;
+ // Handle of our thread.
};
Peer_Handler::Peer_Handler (int argc, char *argv[])
@@ -139,10 +140,7 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
// This code sets up the message to notify us when a new message is
// added to the queue. Actually, the queue notifies ReactorEx which
// then notifies us.
- ACE_Message_Queue<ACE_MT_SYNCH>* mq;
- ACE_NEW (mq, ACE_Message_Queue<ACE_MT_SYNCH>);
- mq->notification_strategy (&this->strategy_);
- this->msg_queue (mq);
+ this->msg_queue ()->notification_strategy (&this->strategy_);
ACE_Get_Opt get_opt (argc, argv, "h:p:");
int c;
@@ -163,8 +161,6 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
Peer_Handler::~Peer_Handler (void)
{
- ACE_Message_Queue<Peer_Handler::SYNCH>* mq = this->msg_queue ();
- delete mq;
}
// This method creates the network connection to the remote peer. It
@@ -206,11 +202,11 @@ Peer_Handler::open (void *)
int
Peer_Handler::handle_output_complete (ACE_Message_Block *msg,
- long bytes_transfered)
+ long bytes_transferred)
{
- if (bytes_transfered <= 0)
+ if (bytes_transferred <= 0)
ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed",
- bytes_transfered));
+ bytes_transferred));
// This was allocated by the STDIN_Handler, queued, dequeued,
// passed to the proactor, and now passed back to us.
@@ -224,11 +220,11 @@ Peer_Handler::handle_output_complete (ACE_Message_Block *msg,
int
Peer_Handler::handle_input_complete (ACE_Message_Block *msg,
- long bytes_transfered)
+ long bytes_transferred)
{
- if ((bytes_transfered > 0) && (msg->length () > 0))
+ if (bytes_transferred > 0 && msg->length () > 0)
{
- msg->rd_ptr ()[bytes_transfered] = '\0';
+ msg->rd_ptr ()[bytes_transferred] = '\0';
// Print out the message received from the server.
ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ()));
delete msg;
@@ -279,6 +275,7 @@ Peer_Handler::handle_output (ACE_HANDLE fd)
ACE_Time_Value tv (ACE_Time_Value::zero);
+ // Forward the message to the remote peer receiver.
if (this->getq (mb, &tv) != -1)
{
if (ACE_Service_Config::proactor ()->
@@ -345,18 +342,25 @@ STDIN_Handler::svc (void)
if (read_result > 0)
{
mb->wr_ptr (read_result);
+ // Note that this call will first enqueue mb onto the peer
+ // handler's message queue, which will then turn around and
+ // notify the ReactorEx via the Notification_Strategy. This
+ // will subsequently signal the Peer_Handler, which will
+ // react by calling back to its handle_output() method,
+ // which dequeues the message and sends it to the peer
+ // across the network.
this->ph_.putq (mb);
}
else
break;
}
- // handle_signal will get called.
- return 42;
+ // handle_signal will get called on the main proactor thread since
+ // we just exited and the main thread is waiting on our thread exit.
+ return 0;
}
-// Register an exit hook with the reactorEx. All this junk is testing
-// out how ACE_Thread::self (hthread_id&) doesn't work!!
+// Register an exit hook with the reactorEx.
void
STDIN_Handler::register_thread_exit_hook (void)
@@ -381,9 +385,10 @@ STDIN_Handler::register_thread_exit_hook (void)
// end the event loop and delete ourself.
int
-STDIN_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
+STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
{
ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n"));
+ ACE_ASSERT (this->thr_handle_ == si->si_handle_);
ACE_Service_Config::end_reactorEx_event_loop ();
return 0;
}
@@ -403,6 +408,7 @@ main (int argc, char *argv[])
// Open active object for reading from stdin.
STDIN_Handler stdin_handler (peer_handler);
+ // Spawn thread.
if (stdin_handler.open () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p open failed, errno = %d.\n",
@@ -418,5 +424,5 @@ main (int argc, char *argv[])
// Run main event demultiplexor.
ACE_Service_Config::run_reactorEx_event_loop ();
- return 42;
+ return 0;
}
diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
index 488fa1e683a..295b36ffda0 100644
--- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
+++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
@@ -34,7 +34,6 @@
#include "ace/Service_Config.h"
#include "ace/Synch.h"
#include "ace/Task.h"
-#include "ace/OS.h"
typedef ACE_Task<ACE_MT_SYNCH> MT_TASK;
@@ -44,6 +43,7 @@ class Peer_Handler : public MT_TASK
// and forward them to the server using proactive I/O.
{
public:
+ // = Initialization methods.
Peer_Handler (int argc, char *argv[]);
~Peer_Handler (void);
@@ -53,12 +53,12 @@ public:
// hostname was specified from the command line.
virtual int handle_output_complete (ACE_Message_Block *msg,
- long bytes_transfered);
+ long bytes_transferred);
// One of our asynchronous writes to the remote peer has completed.
// Make sure it succeeded and then delete the message.
virtual int handle_input_complete (ACE_Message_Block *msg,
- long bytes_transfered);
+ long bytes_transferred);
// The remote peer has sent us something. If it succeeded, print
// out the message and reinitiate a read. Otherwise, fail. In both
// cases, delete the message sent.
@@ -73,14 +73,16 @@ public:
// We've been removed from the ReactorEx.
virtual int handle_output (ACE_HANDLE fd);
- // Called when output events should start
+ // Called when output events should start. Note that this is
+ // automatically invoked by the
+ // <ACE_ReactorEx_Notificiation_Strategy>.
private:
ACE_SOCK_Stream stream_;
// Socket that we have connected to the server.
ACE_ReactorEx_Notification_Strategy strategy_;
- // strategy created such that the reactorEx notifies us when
+ // The strategy object that the reactorEx uses to notify us when
// something is added to the queue.
// = Remote peer info.
@@ -89,9 +91,6 @@ private:
u_short port_;
// Port number for remote host.
-
- // = Make Task happy.
- int close (u_long) { return 0; }
};
class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH>
@@ -101,6 +100,7 @@ class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH>
{
public:
STDIN_Handler (MT_TASK &ph);
+ // Initialization.
virtual int open (void * = 0);
// Activate object.
@@ -109,17 +109,12 @@ public:
// Shut down.
int svc (void);
- // Thread runs here.
+ // Thread runs here as an active object.
private:
- MT_TASK &ph_;
- // Send all input to ph_.
-
static void handler (int signum);
- // Handle a ^C. (Do nothing).
-
- int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
- // Make Task happy.
+ // Handle a ^C. (Do nothing, this just illustrates how we can catch
+ // signals along with the other things).
void register_thread_exit_hook (void);
// Helper function to register with the ReactorEx for thread exit.
@@ -127,6 +122,12 @@ private:
virtual int handle_signal (int index, siginfo_t *, ucontext_t *);
// The STDIN thread has exited. This means the user hit ^C. We can
// end the event loop.
+
+ MT_TASK &ph_;
+ // Send all input to ph_.
+
+ ACE_HANDLE thr_handle_;
+ // Handle of our thread.
};
Peer_Handler::Peer_Handler (int argc, char *argv[])
@@ -139,10 +140,7 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
// This code sets up the message to notify us when a new message is
// added to the queue. Actually, the queue notifies ReactorEx which
// then notifies us.
- ACE_Message_Queue<ACE_MT_SYNCH>* mq;
- ACE_NEW (mq, ACE_Message_Queue<ACE_MT_SYNCH>);
- mq->notification_strategy (&this->strategy_);
- this->msg_queue (mq);
+ this->msg_queue ()->notification_strategy (&this->strategy_);
ACE_Get_Opt get_opt (argc, argv, "h:p:");
int c;
@@ -163,8 +161,6 @@ Peer_Handler::Peer_Handler (int argc, char *argv[])
Peer_Handler::~Peer_Handler (void)
{
- ACE_Message_Queue<Peer_Handler::SYNCH>* mq = this->msg_queue ();
- delete mq;
}
// This method creates the network connection to the remote peer. It
@@ -206,11 +202,11 @@ Peer_Handler::open (void *)
int
Peer_Handler::handle_output_complete (ACE_Message_Block *msg,
- long bytes_transfered)
+ long bytes_transferred)
{
- if (bytes_transfered <= 0)
+ if (bytes_transferred <= 0)
ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed",
- bytes_transfered));
+ bytes_transferred));
// This was allocated by the STDIN_Handler, queued, dequeued,
// passed to the proactor, and now passed back to us.
@@ -224,11 +220,11 @@ Peer_Handler::handle_output_complete (ACE_Message_Block *msg,
int
Peer_Handler::handle_input_complete (ACE_Message_Block *msg,
- long bytes_transfered)
+ long bytes_transferred)
{
- if ((bytes_transfered > 0) && (msg->length () > 0))
+ if (bytes_transferred > 0 && msg->length () > 0)
{
- msg->rd_ptr ()[bytes_transfered] = '\0';
+ msg->rd_ptr ()[bytes_transferred] = '\0';
// Print out the message received from the server.
ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ()));
delete msg;
@@ -279,6 +275,7 @@ Peer_Handler::handle_output (ACE_HANDLE fd)
ACE_Time_Value tv (ACE_Time_Value::zero);
+ // Forward the message to the remote peer receiver.
if (this->getq (mb, &tv) != -1)
{
if (ACE_Service_Config::proactor ()->
@@ -345,18 +342,25 @@ STDIN_Handler::svc (void)
if (read_result > 0)
{
mb->wr_ptr (read_result);
+ // Note that this call will first enqueue mb onto the peer
+ // handler's message queue, which will then turn around and
+ // notify the ReactorEx via the Notification_Strategy. This
+ // will subsequently signal the Peer_Handler, which will
+ // react by calling back to its handle_output() method,
+ // which dequeues the message and sends it to the peer
+ // across the network.
this->ph_.putq (mb);
}
else
break;
}
- // handle_signal will get called.
- return 42;
+ // handle_signal will get called on the main proactor thread since
+ // we just exited and the main thread is waiting on our thread exit.
+ return 0;
}
-// Register an exit hook with the reactorEx. All this junk is testing
-// out how ACE_Thread::self (hthread_id&) doesn't work!!
+// Register an exit hook with the reactorEx.
void
STDIN_Handler::register_thread_exit_hook (void)
@@ -381,9 +385,10 @@ STDIN_Handler::register_thread_exit_hook (void)
// end the event loop and delete ourself.
int
-STDIN_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
+STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *)
{
ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n"));
+ ACE_ASSERT (this->thr_handle_ == si->si_handle_);
ACE_Service_Config::end_reactorEx_event_loop ();
return 0;
}
@@ -403,6 +408,7 @@ main (int argc, char *argv[])
// Open active object for reading from stdin.
STDIN_Handler stdin_handler (peer_handler);
+ // Spawn thread.
if (stdin_handler.open () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p open failed, errno = %d.\n",
@@ -418,5 +424,5 @@ main (int argc, char *argv[])
// Run main event demultiplexor.
ACE_Service_Config::run_reactorEx_event_loop ();
- return 42;
+ return 0;
}