diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1996-12-17 16:35:06 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1996-12-17 16:35:06 +0000 |
commit | 0d7a4c90918697451fc1cd7ebb39102aa162b0f5 (patch) | |
tree | f8543faeadf492ba2fec99ef62e289382d9f97ea | |
parent | ff8c24a33730abc4e8703aab66a17c3d333541af (diff) | |
download | ATCD-0d7a4c90918697451fc1cd7ebb39102aa162b0f5.tar.gz |
foo
-rw-r--r-- | ChangeLog-96b | 80 | ||||
-rw-r--r-- | ace/Local_Name_Space_T.cpp | 4 | ||||
-rw-r--r-- | ace/Memory_Pool.cpp | 2 | ||||
-rw-r--r-- | ace/OS.cpp | 9 | ||||
-rw-r--r-- | ace/OS.h | 3 | ||||
-rw-r--r-- | ace/OS.i | 26 | ||||
-rw-r--r-- | ace/ReactorEx.cpp | 105 | ||||
-rw-r--r-- | ace/ReactorEx.h | 53 | ||||
-rw-r--r-- | ace/Task.cpp | 2 | ||||
-rw-r--r-- | ace/Task_T.i | 5 | ||||
-rw-r--r-- | examples/Reactor/ReactorEx/test_reactorEx.cpp | 74 | ||||
-rw-r--r-- | examples/Reactor/WFMO_Reactor/test_reactorEx.cpp | 74 |
12 files changed, 294 insertions, 143 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b index 048e6151f7e..d867ad7a964 100644 --- a/ChangeLog-96b +++ b/ChangeLog-96b @@ -1,3 +1,82 @@ +<<<<<<< ChangeLog-96b +Tue Dec 17 04:27:07 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * ace/ReactorEx: Added a new feature to the ReactorEx. If we + enable the wait_all flag when calling + ACE_ReactorEx::handle_events() *and* we give an + ACE_Event_Handler (this is a new final param to the call) then + the handle_signal() call will be invoked on this + "wait_all_callback" object when all the handles become signaled. + Moreover, we pass in the array of signaled handled to through + the siginfo_t parameter (see the following ChangeLog entry for + details). If there is no wait_all_callback param, then all the + handle_signal() methods are invoked on all the handles. + + * ace/OS.h (siginfo_t): Augmented the siginfo_t interface so that + we can pass an array of signaled Win32 HANDLEs, in addition to + just a single HANDLE. This is used in the ReactorEx. + + * examples/Reactor/ReactorEx/test_reactorEx.cpp: Added a number of + enhancements to this test program based on discussions with + Irfan, Karlheinz, Dieter, and Detlef. + + * ace/Task_T.i (msg_queue): If we override the existing definition + of the Message_Queue in an ACE_Task then we need to delete the + existing Message_queue (if necessary and mark the Message_Queue + as no longer being a candidate for deletion (since we have + supplied our own definition). Irfan had added this earlier, but + it seemed to get lost... + + * examples/Reactor/Proactor/test_proactor.cpp: The class called + STDIN_HANDLEr is misnamed since we don't read from stdin, we + read from a file. Therefore, I've changed this to be + Input_File_Handler. + + * examples/Reactor/ReactorEx/test_{proactor,reactorEx}.cpp: + Changed misspellings of transfered to transferred. + + * ace/Memory_Pool.cpp (ACE_MMAP_Memory_Pool): Since NT doesn't support + SIGSEGV thre's no point in even trying to register for this + signal! + + * ace/OS.i: Reverted some lost UNICODE fixes -- thanks to Irfan + for finding these. + + * ace/Local_Name_Space_T.cpp (create_manager_i): Removed a + debug statement since it may be causing problems with printing + UNICODE. + +Mon Dec 16 11:25:55 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * ace/OS.i (cuserid): Fixed the definition to ACE_OS::cuserid() so + that it uses LPTSTR. Thanks to Irfan for this fix. + + * ace/Task.cpp (activate): In ACE_Task::activate() there is a possibility to actually + "reactivate" the task using the <force_activate> flag. The following + illustrates that ability: + + if (this->thr_count_ > 0 && force_active == 0) + return 1; // Already active. + else + this->thr_count_ = n_threads; + + The thing is that, when the task is running and we reactivate it + (actually we add threads) the command should be: + + this->thr_count_ += n_threads; + + rather than + + this->thr_count_ = n_threads; + + That way <this->thr_count_> holds the new number of threads currently + associated with the task. Thanks to Hamual for this fix. + + * ace/OS.i (inet_aton): Placed the return 1 within the curly + braces to make the HP/UX compiler happy. Thanks to Kenny Want + for reporting this. + +======= Mon Dec 16 12:56:43 1996 David L. Levine <levine@cs.wustl.edu> * ace/OS.i: removed spurious "*/" after an #endif. Thanks to @@ -44,6 +123,7 @@ Sun Dec 15 13:01:17 1996 David L. Levine <levine@cs.wustl.edu> which causes a core dump with the g++/SunOS5.5 build if the string is in the text segment. +>>>>>>> 4.110 Sun Dec 15 10:29:20 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> * netsvcs/servers/svc.conf: Removed the "lib" prefix for the diff --git a/ace/Local_Name_Space_T.cpp b/ace/Local_Name_Space_T.cpp index d89cf054b81..0f6c66f651b 100644 --- a/ace/Local_Name_Space_T.cpp +++ b/ace/Local_Name_Space_T.cpp @@ -378,8 +378,7 @@ ACE_Local_Name_Space<ACE_MEM_POOL_2, LOCK>::create_manager_i (void) ACE_OS::strcat (this->context_file_, ACE_DIRECTORY_SEPARATOR_STR); ACE_OS::strcat (this->context_file_, this->name_options_->database ()); - ACE_DEBUG ((LM_DEBUG, "contextfile is %s\n", - this->context_file_)); + // ACE_DEBUG ((LM_DEBUG, "contextfile is %s\n", this->context_file_)); ACE_MEM_POOL_OPTIONS options (this->name_options_->base_address ()); @@ -437,7 +436,6 @@ ACE_Local_Name_Space<ACE_MEM_POOL_2, LOCK>::create_manager_i (void) return 0; } - template <ACE_MEM_POOL_1, class LOCK> int ACE_Local_Name_Space<ACE_MEM_POOL_2, LOCK>::list_names (ACE_PWSTRING_SET &set, const ACE_WString &pattern) diff --git a/ace/Memory_Pool.cpp b/ace/Memory_Pool.cpp index 5f671db8c75..22acf5d3809 100644 --- a/ace/Memory_Pool.cpp +++ b/ace/Memory_Pool.cpp @@ -128,8 +128,10 @@ ACE_MMAP_Memory_Pool::ACE_MMAP_Memory_Pool (LPCTSTR backing_store_name, ACE_OS::strncpy (this->backing_store_name_, backing_store_name, sizeof this->backing_store_name_); +#if !defined (ACE_WIN32) if (this->signal_handler_.register_handler (SIGSEGV, this) == -1) ACE_ERROR ((LM_ERROR, "%p\n", this->backing_store_name_)); +#endif /* ACE_WIN32 */ } // Compute the new file_offset of the backing store and commit the diff --git a/ace/OS.cpp b/ace/OS.cpp index e0b395e66a1..f79760c1762 100644 --- a/ace/OS.cpp +++ b/ace/OS.cpp @@ -1443,7 +1443,14 @@ spa (FUNCPTR entry, ...) #if !defined (ACE_HAS_SIGINFO_T) siginfo_t::siginfo_t (ACE_HANDLE handle) - : si_handle_ (handle) + : si_handle_ (handle), + si_handles_ (&handle) +{ +} + +siginfo_t::siginfo_t (ACE_HANDLE *handles) + : si_handle_ (handles[0]), + si_handles_ (handles) { } #endif /* ACE_HAS_SIGINFO_T */ @@ -1827,6 +1827,9 @@ struct ACE_Export siginfo_t ACE_HANDLE si_handle_; // Win32 HANDLE that has become signaled. + + ACE_HANDLE *si_handles_; + // Array of Win32 HANDLEs all of which have become signaled. }; #endif /* ACE_HAS_SIGINFO_T */ @@ -666,8 +666,8 @@ ACE_OS::unlink (const char *path) #endif /* VXWORKS */ } -ACE_INLINE char * -ACE_OS::cuserid (char *user, size_t maxlen) +ACE_INLINE LPTSTR +ACE_OS::cuserid (LPTSTR user, size_t maxlen) { // ACE_TRACE ("ACE_OS::cuserid"); #if defined (VXWORKS) @@ -2675,8 +2675,10 @@ ACE_OS::inet_aton (const char *host_name, struct in_addr *addr) && ACE_OS::strcmp (host_name, "255.255.255.255") != 0) return 0; else if (addr != 0) - ACE_OS::memcpy ((void *) addr, (void *) &ip_addr, sizeof ip_addr); - return 1; + { + ACE_OS::memcpy ((void *) addr, (void *) &ip_addr, sizeof ip_addr); + return 1; + } } ACE_INLINE char * @@ -4195,7 +4197,7 @@ ACE_OS::hostname (char name[], size_t maxnamelen) { // ACE_TRACE ("ACE_OS::uname"); #if defined (ACE_WIN32) - ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::GetComputerName (name, LPDWORD (&maxnamelen)), + ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::GetComputerNameA (name, LPDWORD (&maxnamelen)), ace_result_), int, -1); #else /* !ACE_WIN32 */ struct utsname host_info; @@ -4353,7 +4355,7 @@ ACE_OS::dlopen (ACE_DL_TYPE filename, int mode) #elif defined (ACE_WIN32) ACE_UNUSED_ARG(mode); - ACE_OSCALL_RETURN (::LoadLibrary (filename), void *, 0); + ACE_OSCALL_RETURN (::LoadLibraryA (filename), void *, 0); #else ACE_NOTSUP_RETURN (0); #endif /* ACE_HAS_SVR4_DYNAMIC_LINKING */ @@ -4860,7 +4862,7 @@ ACE_OS::shmget (key_t key, int size, int flags) } ACE_INLINE ACE_HANDLE -ACE_OS::open (LPCTSTR filename, +ACE_OS::open (const char *filename, int mode, int perms) { @@ -4890,11 +4892,11 @@ ACE_OS::open (LPCTSTR filename, if (ACE_BIT_ENABLED (mode, _O_TEMPORARY)) flags |= FILE_FLAG_DELETE_ON_CLOSE; - ACE_HANDLE h = ::CreateFile (filename, access, - FILE_SHARE_READ | FILE_SHARE_WRITE, - 0, creation, - flags, - 0); + ACE_HANDLE h = ::CreateFileA (filename, access, + FILE_SHARE_READ | FILE_SHARE_WRITE, + 0, creation, + flags, + 0); if (h == ACE_INVALID_HANDLE) { diff --git a/ace/ReactorEx.cpp b/ace/ReactorEx.cpp index 9801021e9a9..fb2969f69d2 100644 --- a/ace/ReactorEx.cpp +++ b/ace/ReactorEx.cpp @@ -123,7 +123,8 @@ ACE_ReactorEx::schedule_timer (ACE_Event_Handler *handler, // how_long expired, and 1 if events were dispatched. int ACE_ReactorEx::handle_events (ACE_Time_Value *how_long, - int wait_all) + int wait_all, + ACE_Event_Handler *wait_all_callback) { ACE_TRACE ("ACE_ReactorEx::handle_events"); @@ -169,61 +170,87 @@ ACE_ReactorEx::handle_events (ACE_Time_Value *how_long, errno = ETIME; return 0; case WAIT_ABANDONED_0: - // We'll let dispatch_all worry about abandoned mutexes. + // We'll let dispatch worry about abandoned mutexes. default: // Dispatch. - return this->dispatch_all (wait_status - WAIT_OBJECT_0, wait_all); + if (wait_all != 0) + return this->dispatch (wait_all_callback); + else + return this->dispatch (wait_status - WAIT_OBJECT_0); + } +} + +int +ACE_ReactorEx::dispatch (ACE_Event_Handler *wait_all_callback) +{ + if (wait_all_callback != 0) + { + siginfo_t handles (this->handles_); + if (wait_call_callback->handle_signal (0, &handles) == -1) + // Tim, what should happen if this call fails? Should all of + // the handles be removed? + return -1; + } + else + { + int result = 0; + + for (int i = 0; i < this->active_handles_; i++) + if (this->dispatch_handler (i) == -1) + result = -1; + + // Tim, if a result is != 0 should it contain a single -1, or + // perhaps the number of bad handler dispatches (negated, of + // course!). + return result; } } // Dispatches any active handles from handles_[-index-] to // handles_[active_handles_] using WaitForMultipleObjects to poll // through our handle set looking for active handles. + int -ACE_ReactorEx::dispatch_all (size_t index, int wait_all) +ACE_ReactorEx::dispatch (size_t index) { - while (1) + while (index < active_handles_) { - if (this->dispatch_handler (index) == 0) - index++; - - // Check if we're all out of handles. - if (index == active_handles_) - return 0; - - // If wait_all is TRUE, then we know that every handle is active - // and there's no need to call WaitForMultipleObjects; We just - // iterate through and dispatch each handler. - if (wait_all == 0) + // Tim, if this call fails is there really anything we + // can/should do about it? It seems that regardless of the + // success or failure, we should increment the index since + // otherwise we might just iterate endlessly! + this->dispatch_handler (index); + index++; + + DWORD wait_status = + ::WaitForMultipleObjects (active_handles_ - index, + &handles_[index], + FALSE, 0); // We're polling. + + switch (wait_status) { - DWORD wait_status = - ::WaitForMultipleObjects (active_handles_ - index, - &(handles_[index]), - FALSE, 0); // We're polling. - - switch (wait_status) - { - case WAIT_FAILED: // Failure. - errno = ::GetLastError (); - return -1; - case WAIT_TIMEOUT: - // There are no more handles ready, we can return. - return 0; - default: // Dispatch. - // Check if a handle successfully became signaled. - if ((wait_status >= WAIT_OBJECT_0) && - (wait_status < WAIT_OBJECT_0 + active_handles_)) - index += (wait_status - WAIT_OBJECT_0); - else - // Otherwise, a handle was abandoned. - index += (wait_status - WAIT_ABANDONED_0); - } + case WAIT_FAILED: // Failure. + errno = ::GetLastError (); + return -1; + case WAIT_TIMEOUT: + // There are no more handles ready, we can return. + return 0; + default: // Dispatch. + // Check if a handle successfully became signaled. + if (wait_status >= WAIT_OBJECT_0 && + wait_status < WAIT_OBJECT_0 + active_handles_) + index += wait_status - WAIT_OBJECT_0; + else + // Otherwise, a handle was abandoned. + index += waitstatus - WAIT_ABANDONED_0; } } -} + return 0; +} // Dispatches a single handler. Returns 0 on success, -1 if the // handler was removed. + int ACE_ReactorEx::dispatch_handler (int index) { diff --git a/ace/ReactorEx.h b/ace/ReactorEx.h index e51f9edfac3..6c4ec881b8a 100644 --- a/ace/ReactorEx.h +++ b/ace/ReactorEx.h @@ -130,25 +130,30 @@ public: virtual ~ACE_ReactorEx (void); // Close down the ReactorEx and release all of its resources. - // = Event loop drivers. - // Main event loop driver that blocks for -how_long- before - // returning (will return earlier if I/O or signal events occur). - // Note that -how_long- can be 0, in which case this method blocks - // until I/O events or signals occur. Returns 0 if timed out, 1 if - // an event occurred, and -1 if an error occured. -how_long- is - // decremented to reflect how much time the call to handle_events - // took. For instance, if a time value of 3 seconds is passed to - // handle_events and an event occurs after 2 seconds, -how_long- - // will equal 1 second. This can be used if an application wishes - // to handle events for some fixed amount of time. If wait_all is - // TRUE, then handle_events will only dispatch the handlers if *all* - // handles become active. If a timeout occurs, then no handlers - // will be dispatched. + // = Event loop drivers. Main event loop driver that blocks for + // -how_long- before returning (will return earlier if I/O or signal + // events occur). Note that -how_long- can be 0, in which case this + // method blocks until I/O events or signals occur. Returns 0 if + // timed out, 1 if an event occurred, and -1 if an error occured. + // -how_long- is decremented to reflect how much time the call to + // handle_events took. For instance, if a time value of 3 seconds + // is passed to handle_events and an event occurs after 2 seconds, + // -how_long- will equal 1 second. This can be used if an + // application wishes to handle events for some fixed amount of + // time. If wait_all is TRUE, then handle_events will only dispatch + // the handlers if *all* handles become active. If a timeout + // occurs, then no handlers will be dispatched. If + // <wait_all_callback> is NULL then we dispatch the <handle_signal> + // method on each and every HANDLE in the dispatch array. + // Otherwise, we just call back the <handle_signal> method of the + // <wait_all_callback> object, after first assigning the siginfo_t + // <si_handles_> argument to point to the array of signaled handles. virtual int handle_events (ACE_Time_Value *how_long = 0, - int wait_all = 0); + int wait_all = 0, + ACE_Event_Handler *wait_all_callback = 0); virtual int handle_events (ACE_Time_Value &how_long, - int wait_all = 0); - + int wait_all = 0, + ACE_Event_Handler *wait_all_callback = 0); // = Register and remove Handlers. virtual int register_handler (ACE_Event_Handler *eh, @@ -204,11 +209,21 @@ public: // Declare the dynamic allocation hooks. protected: - int dispatch_all (size_t index, int wait_all); + int dispatch (size_t index); // Dispatches any active handles from handles_[-index-] to - // handles_[active_handles_] using WaitForMultipleObjects to poll + // handles_[active_handles_] using <WaitForMultipleObjects> to poll // through our handle set looking for active handles. + int dispatch (ACE_Event_Handler *wait_all_callback); + // This is called when the user called handle_events() with the + // <wait_all> parameter enabled. In this case, all the handlers are + // now signaled. If <wait_all_callback> is NULL then we dispatch + // the <handle_signal> method on each and every HANDLE in the + // dispatch array. Otherwise, we just call back the <handle_signal> + // method of the <wait_all_callback> object, after first assigning + // the siginfo_t <si_handles_> argument to point to the array of + // signaled handles. + int dispatch_handler (int index); // Dispatches a single handler. Returns 0 on success, -1 if the // handler was removed. diff --git a/ace/Task.cpp b/ace/Task.cpp index 8152f66fe9d..3046b40a9c3 100644 --- a/ace/Task.cpp +++ b/ace/Task.cpp @@ -175,7 +175,7 @@ ACE_Task_Base::activate (long flags, if (this->thr_count_ > 0 && force_active == 0) return 1; // Already active. else - this->thr_count_ = n_threads; + this->thr_count_ += n_threads; // Use the ACE_Thread_Manager singleton if we're running as an // active object and the caller didn't supply us with a diff --git a/ace/Task_T.i b/ace/Task_T.i index 9f5ba06054e..1a8837f24a3 100644 --- a/ace/Task_T.i +++ b/ace/Task_T.i @@ -57,6 +57,11 @@ template <ACE_SYNCH_1> ACE_INLINE void ACE_Task<ACE_SYNCH_2>::msg_queue (ACE_Message_Queue<ACE_SYNCH_2> *mq) { ACE_TRACE ("ACE_Task<ACE_SYNCH_2>::msg_queue"); + if (this->delete_msg_queue_) + { + delete this->msg_queue_; + this->delete_msg_queue_ = 0; + } this->msg_queue_ = mq; } diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp index 488fa1e683a..295b36ffda0 100644 --- a/examples/Reactor/ReactorEx/test_reactorEx.cpp +++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp @@ -34,7 +34,6 @@ #include "ace/Service_Config.h" #include "ace/Synch.h" #include "ace/Task.h" -#include "ace/OS.h" typedef ACE_Task<ACE_MT_SYNCH> MT_TASK; @@ -44,6 +43,7 @@ class Peer_Handler : public MT_TASK // and forward them to the server using proactive I/O. { public: + // = Initialization methods. Peer_Handler (int argc, char *argv[]); ~Peer_Handler (void); @@ -53,12 +53,12 @@ public: // hostname was specified from the command line. virtual int handle_output_complete (ACE_Message_Block *msg, - long bytes_transfered); + long bytes_transferred); // One of our asynchronous writes to the remote peer has completed. // Make sure it succeeded and then delete the message. virtual int handle_input_complete (ACE_Message_Block *msg, - long bytes_transfered); + long bytes_transferred); // The remote peer has sent us something. If it succeeded, print // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. @@ -73,14 +73,16 @@ public: // We've been removed from the ReactorEx. virtual int handle_output (ACE_HANDLE fd); - // Called when output events should start + // Called when output events should start. Note that this is + // automatically invoked by the + // <ACE_ReactorEx_Notificiation_Strategy>. private: ACE_SOCK_Stream stream_; // Socket that we have connected to the server. ACE_ReactorEx_Notification_Strategy strategy_; - // strategy created such that the reactorEx notifies us when + // The strategy object that the reactorEx uses to notify us when // something is added to the queue. // = Remote peer info. @@ -89,9 +91,6 @@ private: u_short port_; // Port number for remote host. - - // = Make Task happy. - int close (u_long) { return 0; } }; class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH> @@ -101,6 +100,7 @@ class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH> { public: STDIN_Handler (MT_TASK &ph); + // Initialization. virtual int open (void * = 0); // Activate object. @@ -109,17 +109,12 @@ public: // Shut down. int svc (void); - // Thread runs here. + // Thread runs here as an active object. private: - MT_TASK &ph_; - // Send all input to ph_. - static void handler (int signum); - // Handle a ^C. (Do nothing). - - int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } - // Make Task happy. + // Handle a ^C. (Do nothing, this just illustrates how we can catch + // signals along with the other things). void register_thread_exit_hook (void); // Helper function to register with the ReactorEx for thread exit. @@ -127,6 +122,12 @@ private: virtual int handle_signal (int index, siginfo_t *, ucontext_t *); // The STDIN thread has exited. This means the user hit ^C. We can // end the event loop. + + MT_TASK &ph_; + // Send all input to ph_. + + ACE_HANDLE thr_handle_; + // Handle of our thread. }; Peer_Handler::Peer_Handler (int argc, char *argv[]) @@ -139,10 +140,7 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) // This code sets up the message to notify us when a new message is // added to the queue. Actually, the queue notifies ReactorEx which // then notifies us. - ACE_Message_Queue<ACE_MT_SYNCH>* mq; - ACE_NEW (mq, ACE_Message_Queue<ACE_MT_SYNCH>); - mq->notification_strategy (&this->strategy_); - this->msg_queue (mq); + this->msg_queue ()->notification_strategy (&this->strategy_); ACE_Get_Opt get_opt (argc, argv, "h:p:"); int c; @@ -163,8 +161,6 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) Peer_Handler::~Peer_Handler (void) { - ACE_Message_Queue<Peer_Handler::SYNCH>* mq = this->msg_queue (); - delete mq; } // This method creates the network connection to the remote peer. It @@ -206,11 +202,11 @@ Peer_Handler::open (void *) int Peer_Handler::handle_output_complete (ACE_Message_Block *msg, - long bytes_transfered) + long bytes_transferred) { - if (bytes_transfered <= 0) + if (bytes_transferred <= 0) ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed", - bytes_transfered)); + bytes_transferred)); // This was allocated by the STDIN_Handler, queued, dequeued, // passed to the proactor, and now passed back to us. @@ -224,11 +220,11 @@ Peer_Handler::handle_output_complete (ACE_Message_Block *msg, int Peer_Handler::handle_input_complete (ACE_Message_Block *msg, - long bytes_transfered) + long bytes_transferred) { - if ((bytes_transfered > 0) && (msg->length () > 0)) + if (bytes_transferred > 0 && msg->length () > 0) { - msg->rd_ptr ()[bytes_transfered] = '\0'; + msg->rd_ptr ()[bytes_transferred] = '\0'; // Print out the message received from the server. ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ())); delete msg; @@ -279,6 +275,7 @@ Peer_Handler::handle_output (ACE_HANDLE fd) ACE_Time_Value tv (ACE_Time_Value::zero); + // Forward the message to the remote peer receiver. if (this->getq (mb, &tv) != -1) { if (ACE_Service_Config::proactor ()-> @@ -345,18 +342,25 @@ STDIN_Handler::svc (void) if (read_result > 0) { mb->wr_ptr (read_result); + // Note that this call will first enqueue mb onto the peer + // handler's message queue, which will then turn around and + // notify the ReactorEx via the Notification_Strategy. This + // will subsequently signal the Peer_Handler, which will + // react by calling back to its handle_output() method, + // which dequeues the message and sends it to the peer + // across the network. this->ph_.putq (mb); } else break; } - // handle_signal will get called. - return 42; + // handle_signal will get called on the main proactor thread since + // we just exited and the main thread is waiting on our thread exit. + return 0; } -// Register an exit hook with the reactorEx. All this junk is testing -// out how ACE_Thread::self (hthread_id&) doesn't work!! +// Register an exit hook with the reactorEx. void STDIN_Handler::register_thread_exit_hook (void) @@ -381,9 +385,10 @@ STDIN_Handler::register_thread_exit_hook (void) // end the event loop and delete ourself. int -STDIN_Handler::handle_signal (int, siginfo_t *, ucontext_t *) +STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) { ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n")); + ACE_ASSERT (this->thr_handle_ == si->si_handle_); ACE_Service_Config::end_reactorEx_event_loop (); return 0; } @@ -403,6 +408,7 @@ main (int argc, char *argv[]) // Open active object for reading from stdin. STDIN_Handler stdin_handler (peer_handler); + // Spawn thread. if (stdin_handler.open () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p open failed, errno = %d.\n", @@ -418,5 +424,5 @@ main (int argc, char *argv[]) // Run main event demultiplexor. ACE_Service_Config::run_reactorEx_event_loop (); - return 42; + return 0; } diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp index 488fa1e683a..295b36ffda0 100644 --- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp +++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp @@ -34,7 +34,6 @@ #include "ace/Service_Config.h" #include "ace/Synch.h" #include "ace/Task.h" -#include "ace/OS.h" typedef ACE_Task<ACE_MT_SYNCH> MT_TASK; @@ -44,6 +43,7 @@ class Peer_Handler : public MT_TASK // and forward them to the server using proactive I/O. { public: + // = Initialization methods. Peer_Handler (int argc, char *argv[]); ~Peer_Handler (void); @@ -53,12 +53,12 @@ public: // hostname was specified from the command line. virtual int handle_output_complete (ACE_Message_Block *msg, - long bytes_transfered); + long bytes_transferred); // One of our asynchronous writes to the remote peer has completed. // Make sure it succeeded and then delete the message. virtual int handle_input_complete (ACE_Message_Block *msg, - long bytes_transfered); + long bytes_transferred); // The remote peer has sent us something. If it succeeded, print // out the message and reinitiate a read. Otherwise, fail. In both // cases, delete the message sent. @@ -73,14 +73,16 @@ public: // We've been removed from the ReactorEx. virtual int handle_output (ACE_HANDLE fd); - // Called when output events should start + // Called when output events should start. Note that this is + // automatically invoked by the + // <ACE_ReactorEx_Notificiation_Strategy>. private: ACE_SOCK_Stream stream_; // Socket that we have connected to the server. ACE_ReactorEx_Notification_Strategy strategy_; - // strategy created such that the reactorEx notifies us when + // The strategy object that the reactorEx uses to notify us when // something is added to the queue. // = Remote peer info. @@ -89,9 +91,6 @@ private: u_short port_; // Port number for remote host. - - // = Make Task happy. - int close (u_long) { return 0; } }; class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH> @@ -101,6 +100,7 @@ class STDIN_Handler : public ACE_Task<ACE_NULL_SYNCH> { public: STDIN_Handler (MT_TASK &ph); + // Initialization. virtual int open (void * = 0); // Activate object. @@ -109,17 +109,12 @@ public: // Shut down. int svc (void); - // Thread runs here. + // Thread runs here as an active object. private: - MT_TASK &ph_; - // Send all input to ph_. - static void handler (int signum); - // Handle a ^C. (Do nothing). - - int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } - // Make Task happy. + // Handle a ^C. (Do nothing, this just illustrates how we can catch + // signals along with the other things). void register_thread_exit_hook (void); // Helper function to register with the ReactorEx for thread exit. @@ -127,6 +122,12 @@ private: virtual int handle_signal (int index, siginfo_t *, ucontext_t *); // The STDIN thread has exited. This means the user hit ^C. We can // end the event loop. + + MT_TASK &ph_; + // Send all input to ph_. + + ACE_HANDLE thr_handle_; + // Handle of our thread. }; Peer_Handler::Peer_Handler (int argc, char *argv[]) @@ -139,10 +140,7 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) // This code sets up the message to notify us when a new message is // added to the queue. Actually, the queue notifies ReactorEx which // then notifies us. - ACE_Message_Queue<ACE_MT_SYNCH>* mq; - ACE_NEW (mq, ACE_Message_Queue<ACE_MT_SYNCH>); - mq->notification_strategy (&this->strategy_); - this->msg_queue (mq); + this->msg_queue ()->notification_strategy (&this->strategy_); ACE_Get_Opt get_opt (argc, argv, "h:p:"); int c; @@ -163,8 +161,6 @@ Peer_Handler::Peer_Handler (int argc, char *argv[]) Peer_Handler::~Peer_Handler (void) { - ACE_Message_Queue<Peer_Handler::SYNCH>* mq = this->msg_queue (); - delete mq; } // This method creates the network connection to the remote peer. It @@ -206,11 +202,11 @@ Peer_Handler::open (void *) int Peer_Handler::handle_output_complete (ACE_Message_Block *msg, - long bytes_transfered) + long bytes_transferred) { - if (bytes_transfered <= 0) + if (bytes_transferred <= 0) ACE_DEBUG ((LM_DEBUG, "%p bytes = %d\n", "Message failed", - bytes_transfered)); + bytes_transferred)); // This was allocated by the STDIN_Handler, queued, dequeued, // passed to the proactor, and now passed back to us. @@ -224,11 +220,11 @@ Peer_Handler::handle_output_complete (ACE_Message_Block *msg, int Peer_Handler::handle_input_complete (ACE_Message_Block *msg, - long bytes_transfered) + long bytes_transferred) { - if ((bytes_transfered > 0) && (msg->length () > 0)) + if (bytes_transferred > 0 && msg->length () > 0) { - msg->rd_ptr ()[bytes_transfered] = '\0'; + msg->rd_ptr ()[bytes_transferred] = '\0'; // Print out the message received from the server. ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ())); delete msg; @@ -279,6 +275,7 @@ Peer_Handler::handle_output (ACE_HANDLE fd) ACE_Time_Value tv (ACE_Time_Value::zero); + // Forward the message to the remote peer receiver. if (this->getq (mb, &tv) != -1) { if (ACE_Service_Config::proactor ()-> @@ -345,18 +342,25 @@ STDIN_Handler::svc (void) if (read_result > 0) { mb->wr_ptr (read_result); + // Note that this call will first enqueue mb onto the peer + // handler's message queue, which will then turn around and + // notify the ReactorEx via the Notification_Strategy. This + // will subsequently signal the Peer_Handler, which will + // react by calling back to its handle_output() method, + // which dequeues the message and sends it to the peer + // across the network. this->ph_.putq (mb); } else break; } - // handle_signal will get called. - return 42; + // handle_signal will get called on the main proactor thread since + // we just exited and the main thread is waiting on our thread exit. + return 0; } -// Register an exit hook with the reactorEx. All this junk is testing -// out how ACE_Thread::self (hthread_id&) doesn't work!! +// Register an exit hook with the reactorEx. void STDIN_Handler::register_thread_exit_hook (void) @@ -381,9 +385,10 @@ STDIN_Handler::register_thread_exit_hook (void) // end the event loop and delete ourself. int -STDIN_Handler::handle_signal (int, siginfo_t *, ucontext_t *) +STDIN_Handler::handle_signal (int, siginfo_t *si, ucontext_t *) { ACE_DEBUG ((LM_DEBUG, "STDIN thread has exited.\n")); + ACE_ASSERT (this->thr_handle_ == si->si_handle_); ACE_Service_Config::end_reactorEx_event_loop (); return 0; } @@ -403,6 +408,7 @@ main (int argc, char *argv[]) // Open active object for reading from stdin. STDIN_Handler stdin_handler (peer_handler); + // Spawn thread. if (stdin_handler.open () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p open failed, errno = %d.\n", @@ -418,5 +424,5 @@ main (int argc, char *argv[]) // Run main event demultiplexor. ACE_Service_Config::run_reactorEx_event_loop (); - return 42; + return 0; } |