diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1999-06-09 21:51:54 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1999-06-09 21:51:54 +0000 |
commit | 9c8ea09de3b59afd0bc3e84c2241fbb346175721 (patch) | |
tree | bc2dc54deb9aff3a205031cc9ec7ff3f62fd7a8b | |
parent | bc0265b91128210785c1f18354e5a63ddc03fd98 (diff) | |
download | ATCD-9c8ea09de3b59afd0bc3e84c2241fbb346175721.tar.gz |
.
-rw-r--r-- | ChangeLog-99b | 46 | ||||
-rw-r--r-- | TAO/ChangeLog-99c | 52 | ||||
-rw-r--r-- | ace/Message_Queue_T.h | 13 | ||||
-rw-r--r-- | ace/Message_Queue_T.i | 6 | ||||
-rw-r--r-- | ace/OS.h | 32 | ||||
-rw-r--r-- | ace/Svc_Conf_y.cpp | 18 | ||||
-rw-r--r-- | ace/Svc_Handler.cpp | 272 | ||||
-rw-r--r-- | ace/Svc_Handler.h | 87 | ||||
-rw-r--r-- | examples/Threads/task_five.cpp | 58 | ||||
-rw-r--r-- | tests/Svc_Handler_Test.cpp | 8 |
10 files changed, 381 insertions, 211 deletions
diff --git a/ChangeLog-99b b/ChangeLog-99b index 445f2bfbe19..2ace93a44dd 100644 --- a/ChangeLog-99b +++ b/ChangeLog-99b @@ -1,12 +1,24 @@ -Wed Jun 9 15:59:50 1999 Jeff Parsons <parsons@cs.wustl.edu> +Wed Jun 9 15:32:37 1999 Douglas C. Schmidt <schmidt@mambo.cs.wustl.edu> - * tests/version_tests/version_tests.dsw: - * tests/run_tests.bat: - * tests/run_tests.psosim: - * tests/run_tests.vxworks: - Added Svc_Handler_Test to these. + * ace/Message_Queue_T: Added a lock() accessor method so that + other components can access the lock used by a Message Queue. + + * ace/Svc_Handler.cpp: Added better support for timeout handling + for the ACE_Buffered_Svc_Handler. + + * ace/Svc_Handler: Split the buffering portion of ACE_Svc_Handler + off into a separate class called ACE_Buffered_Svc_Handler to + avoid adding extra space and complexity to the existing + ACE_Svc_Handler. Thanks to Irfan for suggesting this, as well + as also suggesting ways to improve the performance by caching + the current size of the buffer. -Wed Jun 9 13:06:43 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + * ace/OS.h: Added a patch to work around some problems with macros + for USYNCH_THREAD. Thanks to David Levine and Russ Noseworthy + for this. + + * examples/Threads/task_five.cpp (main): Improved the form and + content of this example a bit. * include/makeinclude/platform_sunos5_sunc++.GNU, * include/makeinclude/platform_sunos5_g++.GNU, @@ -20,14 +32,6 @@ Wed Jun 9 13:06:43 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> * ace/SPIPE_Stream: Added sendv(), sendv_n(), and recvv_n() methods to SPIPE_Stream. Thank to Carlos for pointing this out. -Wed Jun 9 12:23:30 1999 Jeff Parsons <parsons@cs.wustl.edu> - - * tests/tests.dsw: - Added Svc_Handler_Test.dsp (created and checked in by - Nanbor) to the workspace. - -Wed Jun 9 00:20:49 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> - * ace/config-mvs.h: Changed the size of the ACE_SIZEOF_DOUBLE and ACE_SIZEOF_LONG_DOUBLE from 4 and to 8 and 16, respectively. Thanks to Jim Rogers for reporting this. @@ -63,6 +67,18 @@ Wed Jun 9 00:20:49 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> is "full" or (2) a period of time elapses, at which point the queue is "flushed" via sendv_n() to the peer. +Wed Jun 9 12:23:30 1999 Jeff Parsons <parsons@cs.wustl.edu> + + * tests/version_tests/version_tests.dsw: + * tests/run_tests.bat: + * tests/run_tests.psosim: + * tests/run_tests.vxworks: + Added Svc_Handler_Test to these. + + * tests/tests.dsw: + Added Svc_Handler_Test.dsp (created and checked in by + Nanbor) to the workspace. + Wed Jun 9 10:46:07 1999 Carlos O'Ryan <coryan@cs.wustl.edu> * ace/Strategies.cpp: diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c index dc59d5ec834..910d532455e 100644 --- a/TAO/ChangeLog-99c +++ b/TAO/ChangeLog-99c @@ -1,19 +1,18 @@ Wed Jun 09 16:42:03 1999 David L. Levine <levine@cs.wustl.edu> * orbsvcs/orbsvcs/Sched/Config_Scheduler.cpp (compute_scheduling): - changed type of anomaly_severity_msg from char * to const char *, - to avoid warnings from Sun C++ 5.0 about string literal conversion. - Thanks to Doug for reporting this. Also, removed and changed a few - casts to ANSI-style casts. + changed type of anomaly_severity_msg from char * to const char + *, to avoid warnings from Sun C++ 5.0 about string literal + conversion. Thanks to Doug for reporting this. Also, removed + and changed a few casts to ANSI-style casts. Wed Jun 9 16:26:55 1999 Jeff Parsons <parsons@cs.wustl.edu> - * tao/Any.cpp: - Changed one branch of >>= (to_object) to reflect - the change in CORBA 2.3 - namely that the caller - is responsible for release of the result of this - call. The other branch already had this behavior. - Also added an ACE_NEW check in that other branch. + * tao/Any.cpp: Changed one branch of >>= (to_object) to reflect + the change in CORBA 2.3 - namely that the caller is responsible + for release of the result of this call. The other branch already + had this behavior. Also added an ACE_NEW check in that other + branch. Wed Jun 09 16:03:12 1999 Irfan Pyarali <irfan@cs.wustl.edu> @@ -21,34 +20,29 @@ Wed Jun 09 16:03:12 1999 Irfan Pyarali <irfan@cs.wustl.edu> * examples/POA/Forwarding/server.cpp * examples/POA/On_Demand_Activation/server.cpp * examples/POA/RootPOA/RootPOA.cpp - * examples/POA/TIE/server.cpp: + * examples/POA/TIE/server.cpp: Simple clean up. Also fixed one + memory leak. - Simple clean up. Also fixed one memory leak. + * tao/default_resource.cpp (init): -ORBReactorType handling and + selection of Reactor was incorrect. Correct behavior is that + "null" should select TAO_REACTOR_SELECT_ST and "token" should + select TAO_REACTOR_SELECT_MT. Thanks to Phil Mesnier + <mesnier_p@ociweb.com> for reporting this bug. Wed Jun 9 13:43:18 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> * TAO_IDL/fe: Regenerated the y.tab.cpp file with the new BYACC so - that we won't have any more annoying warnings about const - correctness. + that we won't have any more annoying warnings about const + correctness. * utils/nslist: Added a new utility program that prints out a - nicely organized list of Naming Contexts in a Naming Service. - Thanks to Thomas Lockhart <Thomas.G.Lockhart@jpl.nasa.gov> for - contributing this. - -Wed Jun 09 11:02:03 1999 Irfan Pyarali <irfan@cs.wustl.edu> - - * tao/default_resource.cpp (init): -ORBReactorType handling and - selection of Reactor was incorrect. Correct behavior is that - "null" should select TAO_REACTOR_SELECT_ST and "token" should - select TAO_REACTOR_SELECT_MT. Thanks to Phil Mesnier - <mesnier_p@ociweb.com> for reporting this bug. - -Wed Jun 9 10:32:15 1999 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + nicely organized list of Naming Contexts in a Naming Service. + Thanks to Thomas Lockhart <Thomas.G.Lockhart@jpl.nasa.gov> for + contributing this. * TAO_IDL/driver/drv_preproc.cpp (DRV_pre_proc): Fixed a bug that - was preventing the IDL compiler from putting files in the - current directory. Thanks to Byron Harris for reporting this. + was preventing the IDL compiler from putting files in the + current directory. Thanks to Byron Harris for reporting this. Tue Jun 08 21:13:36 1999 Irfan Pyarali <irfan@cs.wustl.edu> diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h index 95ece558c3a..90484d98a28 100644 --- a/ace/Message_Queue_T.h +++ b/ace/Message_Queue_T.h @@ -213,6 +213,9 @@ public: virtual ACE_Notification_Strategy *notification_strategy (void); virtual void notification_strategy (ACE_Notification_Strategy *s); + ACE_SYNCH_MUTEX_T &lock (void); + // Returns a reference to the lock used by the <ACE_Message_Queue>. + virtual void dump (void) const; // Dump the state of an object. @@ -221,6 +224,7 @@ public: protected: // = Routines that actually do the enqueueing and dequeueing. + // These routines assume that locks are held by the corresponding // public methods. Since they are virtual, you can change the // queueing mechanism by subclassing from <ACE_Message_Queue>. @@ -239,18 +243,25 @@ protected: // queue. // = Check the boundary conditions (assumes locks are held). + virtual int is_full_i (void); // True if queue is full, else false. + virtual int is_empty_i (void); // True if queue is empty, else false. - // = Implementation of the public activate() and deactivate() methods above (assumes locks are held). + // = Implementation of the public <activate> and <deactivate> methods. + + // These methods assume locks are held. + virtual int deactivate_i (void); // Deactivate the queue. + virtual int activate_i (void); // Activate the queue. // = Helper methods to factor out common #ifdef code. + virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, ACE_Time_Value *timeout); // Wait for the queue to become non-full. diff --git a/ace/Message_Queue_T.i b/ace/Message_Queue_T.i index c0517a71f47..b5ddf46a60f 100644 --- a/ace/Message_Queue_T.i +++ b/ace/Message_Queue_T.i @@ -149,4 +149,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void) return this->deactivated_; } +template <ACE_SYNCH_DECL> ACE_INLINE ACE_SYNCH_MUTEX_T & +ACE_Message_Queue<ACE_SYNCH_USE>::lock (void) +{ + return this->lock_; +} + ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator) @@ -2017,19 +2017,31 @@ typedef pthread_mutex_t ACE_thread_mutex_t; # if defined (ACE_HAS_PTHREADS_DRAFT4) # if defined (PTHREAD_PROCESS_PRIVATE) +# if !defined (USYNC_THREAD) # define USYNC_THREAD PTHREAD_PROCESS_PRIVATE +# endif /* ! USYNC_THREAD */ # else +# if !defined (USYNC_THREAD) # define USYNC_THREAD MUTEX_NONRECURSIVE_NP +# endif /* ! USYNC_THREAD */ # endif /* PTHREAD_PROCESS_PRIVATE */ # if defined (PTHREAD_PROCESS_SHARED) +# if !defined (USYNC_PROCESS) # define USYNC_PROCESS PTHREAD_PROCESS_SHARED +# endif /* ! USYNC_PROCESS */ # else +# if !defined (USYNC_PROCESS) # define USYNC_PROCESS MUTEX_NONRECURSIVE_NP +# endif /* ! USYNC_PROCESS */ # endif /* PTHREAD_PROCESS_SHARED */ # elif !defined (ACE_HAS_STHREADS) +# if !defined (USYNC_THREAD) # define USYNC_THREAD PTHREAD_PROCESS_PRIVATE +# endif /* ! USYNC_THREAD */ +# if !defined (USYNC_PROCESS) # define USYNC_PROCESS PTHREAD_PROCESS_SHARED +# endif /* ! USYNC_PROCESS */ # endif /* ACE_HAS_PTHREADS_DRAFT4 */ # define THR_BOUND 0x00000001 @@ -2078,14 +2090,8 @@ protected: # endif /* !ACE_HAS_POSIX_SEM */ # if defined (ACE_LACKS_PTHREAD_YIELD) && defined (ACE_HAS_THR_YIELD) -# if defined (USYNC_THREAD) -# undef USYNC_THREAD -# endif /* USYNC_THREAD */ -# if defined (USYNC_PROCESS) -# undef USYNC_PROCESS -# endif /* USYNC_PROCESS */ -// If we are on Solaris we can just reuse the existing implementations -// of these synchronization types. + // If we are on Solaris we can just reuse the existing + // implementations of these synchronization types. # if !defined (ACE_LACKS_RWLOCK_T) # include /**/ <synch.h> typedef rwlock_t ACE_rwlock_t; @@ -2311,11 +2317,11 @@ public: # define THR_CANCEL_ENABLE 0 # define THR_CANCEL_DEFERRED 0 # define THR_CANCEL_ASYNCHRONOUS 0 -# define THR_DETACHED 0x02000000 /* ?? ignore in most places */ -# define THR_BOUND 0 /* ?? ignore in most places */ -# define THR_NEW_LWP 0 /* ?? ignore in most places */ -# define THR_DAEMON 0 /* ?? ignore in most places */ -# define THR_JOINABLE 0 /* ?? ignore in most places */ +# define THR_DETACHED 0x02000000 /* ignore in most places */ +# define THR_BOUND 0 /* ignore in most places */ +# define THR_NEW_LWP 0 /* ignore in most places */ +# define THR_DAEMON 0 /* ignore in most places */ +# define THR_JOINABLE 0 /* ignore in most places */ # define THR_SUSPENDED CREATE_SUSPENDED # define THR_USE_AFX 0x01000000 # define THR_SCHED_FIFO 0 diff --git a/ace/Svc_Conf_y.cpp b/ace/Svc_Conf_y.cpp index bd946eb7b81..9083f7fd7e1 100644 --- a/ace/Svc_Conf_y.cpp +++ b/ace/Svc_Conf_y.cpp @@ -424,7 +424,7 @@ int ace_yyindent; #endif /* ACE_YYDEBUG_INDENT */ #ifndef ACE_YYDEBUG_REDUCE #ifdef __cplusplus -void ACE_YYDEBUG_REDUCE(int ace_yynew_state, int ace_yyrule_num, const char *ace_yyrule_string, int ace_yynew_indent, int ace_yyrhs_count) +void ACE_YYDEBUG_REDUCE(int /* ace_yynew_state */, int /* ace_yyrule_num */, const char *ace_yyrule_string, int ace_yynew_indent, int ace_yyrhs_count) #else ACE_YYDEBUG_REDUCE(ace_yynew_state, ace_yyrule_num, ace_yyrule_string, ace_yynew_indent, ace_yyrhs_count) int ace_yynew_state; @@ -454,7 +454,7 @@ int ace_yyrhs_count; #endif /* ACE_YYDEBUG_REDUCE */ #ifndef ACE_YYDEBUG_SHIFT_LEXEME #ifdef __cplusplus -void ACE_YYDEBUG_SHIFT_LEXEME(int ace_yyold_state, int ace_yynew_state, const char *ace_yytoken_string, int ace_yynew_indent) +void ACE_YYDEBUG_SHIFT_LEXEME(int /* ace_yyold_state */, int /* ace_yynew_state*/, const char *ace_yytoken_string, int ace_yynew_indent) #else ACE_YYDEBUG_SHIFT_LEXEME(ace_yyold_state, ace_yynew_state, ace_yytoken_string, ace_yynew_indent) int ace_yyold_state; @@ -469,7 +469,7 @@ int ace_yynew_indent; #endif /* ACE_YYDEBUG_SHIFT_LEXEME */ #ifndef ACE_YYDEBUG_LOOK_AHEAD #ifdef __cplusplus -void ACE_YYDEBUG_LOOK_AHEAD(int ace_yynew_state, int ace_yytoken_num, const char *ace_yytoken_string, int ace_yyindent) +void ACE_YYDEBUG_LOOK_AHEAD(int /* ace_yynew_state */, int ace_yytoken_num, const char *ace_yytoken_string, int ace_yyindent) #else ACE_YYDEBUG_LOOK_AHEAD(ace_yynew_state, ace_yytoken_num, ace_yytoken_string, ace_yyindent) int ace_yynew_state; @@ -486,7 +486,7 @@ int ace_yyindent; #endif /* ACE_YYDEBUG_LOOK_AHEAD */ #ifndef ACE_YYDEBUG_DISCARD_STATE #ifdef __cplusplus -void ACE_YYDEBUG_DISCARD_STATE(int ace_yynew_state, int ace_yyindent) +void ACE_YYDEBUG_DISCARD_STATE(int /* ace_yynew_state */, int ace_yyindent) #else ACE_YYDEBUG_DISCARD_STATE(ace_yynew_state, ace_yyindent) int ace_yynew_state; @@ -514,7 +514,7 @@ int ace_yyindent; #endif /* ACE_YYDEBUG_DISCARD_STATE */ #ifndef ACE_YYDEBUG_DISCARD_TOKEN #ifdef __cplusplus -void ACE_YYDEBUG_DISCARD_TOKEN(int ace_yynew_state, int ace_yytoken_num, const char *ace_yytoken_string, int ace_yyindent) +void ACE_YYDEBUG_DISCARD_TOKEN(int /* ace_yynew_state */, int /* ace_yytoken_num */, const char *ace_yytoken_string, int ace_yyindent) #else ACE_YYDEBUG_DISCARD_TOKEN(ace_yynew_state, ace_yytoken_num, ace_yytoken_string, ace_yyindent) int ace_yynew_state; @@ -529,7 +529,7 @@ int ace_yyindent; #endif /* ACE_YYDEBUG_DISCARD_TOKEN */ #ifndef ACE_YYDEBUG_SHIFT_ERROR_LEXEME #ifdef __cplusplus -void ACE_YYDEBUG_SHIFT_ERROR_LEXEME(int ace_yyold_state, int ace_yynew_state, int ace_yyindent) +void ACE_YYDEBUG_SHIFT_ERROR_LEXEME(int /* ace_yyold_state */, int /* ace_yynew_state */, int ace_yyindent) #else ACE_YYDEBUG_SHIFT_ERROR_LEXEME(ace_yyold_state, ace_yynew_state, ace_yyindent) int ace_yyold_state; @@ -555,7 +555,7 @@ ace_yyparse() extern char *ace_foo(); #endif - if (ace_yys = ACE_OS::getenv("ACE_YYDEBUG")) + if ((ace_yys = ACE_OS::getenv("ACE_YYDEBUG"))) { ace_yyn = *ace_yys; if (ace_yyn >= '0' && ace_yyn <= '9') @@ -572,7 +572,7 @@ ace_yyparse() *ace_yyssp = ace_yystate = 0; ace_yyloop: - if (ace_yyn = ace_yydefred[ace_yystate]) goto ace_yyreduce; + if ((ace_yyn = ace_yydefred[ace_yystate])) goto ace_yyreduce; if (ace_yychar < 0) { if ((ace_yychar = ace_yylex()) < 0) ace_yychar = 0; @@ -682,7 +682,7 @@ ace_yyinrecovery: if (5 < ace_yydebug) printf("ace_yydebug: state %d, error recovery discards token %d (%s)\n", ace_yystate, ace_yychar, ace_yys); - else + else ACE_YYDEBUG_DISCARD_TOKEN(ace_yystate, ace_yychar, ace_yys, ace_yyssp-ace_yyss); } #endif diff --git a/ace/Svc_Handler.cpp b/ace/Svc_Handler.cpp index 4aa7b10f018..9a301bc38f8 100644 --- a/ace/Svc_Handler.cpp +++ b/ace/Svc_Handler.cpp @@ -81,16 +81,11 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::operator delete (void *obj) template <PR_ST_1, ACE_SYNCH_DECL> ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Svc_Handler (ACE_Thread_Manager *tm, ACE_Message_Queue<ACE_SYNCH_USE> *mq, - ACE_Reactor *reactor, - size_t maximum_buffer_size, - ACE_Time_Value *timeout) + ACE_Reactor *reactor) : ACE_Task<ACE_SYNCH_USE> (tm, mq), closing_ (0), recycler_ (0), - recycling_act_ (0), - maximum_buffer_size_ (maximum_buffer_size), - timeout_ (timeout == 0 ? ACE_Time_Value::zero : *timeout), - timeoutp_ (timeout) + recycling_act_ (0) { ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Svc_Handler"); @@ -184,105 +179,24 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::cleanup_hint (void) } -template <PR_ST_1, ACE_SYNCH_DECL> int -ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::put (ACE_Message_Block *mb, - ACE_Time_Value *tv) -{ - // Enqueue <mb> onto the message queue. - if (this->putq (mb, tv) == -1) - return -1; - else - { - // Flush the buffer when the number of bytes exceeds the maximum - // buffer size or when the timeout period has elapsed. - if (mb->total_size () >= this->maximum_buffer_size_ - || (this->timeoutp_ != 0 - && this->timeout_ > ACE_OS::gettimeofday ())) - return this->flush (); - - return 0; - } -} - -// Flush the buffer. - -template <PR_ST_1, ACE_SYNCH_DECL> int -ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::flush (void) -{ - // @@ Doug, need to add appropriate locks! - ACE_Message_Block *entry = 0; - iovec iov[IOV_MAX]; - size_t i = 0; - int result = 0; - - // Iterate over all the <ACE_Message_Block>s in the - // <ACE_Message_Queue> and prepare them to be written out. - for (ACE_Message_Queue_Iterator<ACE_SYNCH_USE> iterator (*this->msg_queue ()); - iterator.next (entry) != 0 - && result == 0; - iterator.advance ()) - { - // Iterate over all the <Message_Block>s in a chain, including - // continuations. - for (ACE_Message_Block *temp = entry; - temp != 0; - temp = entry->cont ()) - { - iov[i].iov_len = temp->length (); - iov[i].iov_base = temp->rd_ptr (); - - i++; - - // Flush the <iovec>s when we've reached the maximum size - // for the platform. - if (i == IOV_MAX) - { -#if defined (ACE_DEBUGGING) - ACE_DEBUG ((LM_DEBUG, - "sending data (inside loop, i = %d)\n", - i)); -#endif /* ACE_DEBUGGING */ - // Send off the data. - if (this->peer ().sendv_n (iov, - i) == -1) - { - result = -1; - break; - } - i = 0; - } - } - } - - // Take care of any remaining <iovec>s. - if (i > 0 && result != -1) - { - if (this->peer ().sendv_n (iov, i) == -1) - result = -1; -#if defined (ACE_DEBUGGING) - ACE_DEBUG ((LM_DEBUG, - "sending data (final flush, i = %d)\n", - i)); -#endif /* ACE_DEBUGGING */ - } - - // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue> - // and <release> their memory. - while (this->msg_queue ()->is_empty () == 0) - { - if (this->msg_queue ()->dequeue_head (entry) == -1) - break; - - entry->release (); - } - - return result; -} - template <PR_ST_1, ACE_SYNCH_DECL> void ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump (void) const { ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump"); + + this->peer_.dump (); + ACE_DEBUG ((LM_DEBUG, + "dynamic_ = %d\n", + this->dynamic_)); + ACE_DEBUG ((LM_DEBUG, + "closing_ = %d\n", + this->closing_)); + ACE_DEBUG ((LM_DEBUG, + "recycler_ = %d\n", + this->recycler_)); + ACE_DEBUG ((LM_DEBUG, + "recycling_act_ = %d\n", + this->recycling_act_)); } template <PR_ST_1, ACE_SYNCH_DECL> ACE_PEER_STREAM & @@ -414,6 +328,160 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::recycle (void *) return 0; } +template <PR_ST_1, ACE_SYNCH_DECL> +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::~ACE_Buffered_Svc_Handler (void) +{ + this->flush (); +} + +template <PR_ST_1, ACE_SYNCH_DECL> +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Buffered_Svc_Handler (ACE_Thread_Manager *tm, + ACE_Message_Queue<ACE_SYNCH_USE> *mq, + ACE_Reactor *reactor, + size_t maximum_buffer_size, + ACE_Time_Value *timeout) + : ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE> (tm, mq, reactor), + maximum_buffer_size_ (maximum_buffer_size), + current_buffer_size_ (0), + timeoutp_ (timeout) +{ + ACE_TRACE ("ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Buffered_Svc_Handler"); + + if (this->timeoutp_ != 0) + { + this->interval_ = *timeout; + this->next_timeout_ = ACE_OS::gettimeofday () + this->interval_; + } +} + +template <PR_ST_1, ACE_SYNCH_DECL> int +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->msg_queue ()->lock (), -1); + + // Enqueue <mb> onto the message queue. + if (this->putq (mb, tv) == -1) + return -1; + else + { + // Update the current number of bytes on the queue. + this->current_buffer_size_ += mb->total_size (); + + // Flush the buffer when the number of bytes exceeds the maximum + // buffer size or when the timeout period has elapsed. + if (this->current_buffer_size_ >= this->maximum_buffer_size_ + || (this->timeoutp_ != 0 + && this->next_timeout_ <= ACE_OS::gettimeofday ())) + return this->flush (); + else + return 0; + } +} + +// Flush the buffer. + +template <PR_ST_1, ACE_SYNCH_DECL> int +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::flush (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->msg_queue ()->lock (), -1); + ACE_Message_Block *entry = 0; + iovec iov[IOV_MAX]; + size_t i = 0; + int result = 0; + + // Iterate over all the <ACE_Message_Block>s in the + // <ACE_Message_Queue> and prepare them to be written out. + for (ACE_Message_Queue_Iterator<ACE_SYNCH_USE> iterator (*this->msg_queue ()); + iterator.next (entry) != 0 + && result == 0; + iterator.advance ()) + { + // Iterate over all the <Message_Block>s in a chain, including + // continuations. + for (ACE_Message_Block *temp = entry; + temp != 0; + temp = entry->cont ()) + { + iov[i].iov_len = temp->length (); + iov[i].iov_base = temp->rd_ptr (); + + i++; + + // Flush the <iovec>s when we've reached the maximum size + // for the platform. + if (i == IOV_MAX) + { +#if defined (ACE_DEBUGGING) + ACE_DEBUG ((LM_DEBUG, + "sending data (inside loop, i = %d)\n", + i)); +#endif /* ACE_DEBUGGING */ + // Send off the data. + if (this->peer ().sendv_n (iov, + i) == -1) + { + result = -1; + break; + } + i = 0; + } + } + } + + // Take care of any remaining <iovec>s. + if (i > 0 && result != -1) + { + if (this->peer ().sendv_n (iov, i) == -1) + result = -1; +#if defined (ACE_DEBUGGING) + ACE_DEBUG ((LM_DEBUG, + "sending data (final flush, i = %d)\n", + i)); +#endif /* ACE_DEBUGGING */ + } + + // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue> + // and <release> their memory. + while (this->msg_queue ()->is_empty () == 0) + { + if (this->msg_queue ()->dequeue_head (entry) == -1) + break; + + entry->release (); + } + + if (this->timeoutp_ != 0) + // Update the next timeout period by adding the interval. + this->next_timeout_ += this->interval_; + + this->current_buffer_size_ = 0; + + return result; +} + +template <PR_ST_1, ACE_SYNCH_DECL> void +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump (void) const +{ + ACE_TRACE ("ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump"); + + ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump (); + ACE_DEBUG ((LM_DEBUG, + "maximum_buffer_size_ = %d\n", + this->maximum_buffer_size_)); + ACE_DEBUG ((LM_DEBUG, + "current_buffer_size_ = %d\n", + this->current_buffer_size_)); + if (this->next_timeout_ != 0) + ACE_DEBUG ((LM_DEBUG, + "next_timeout_.sec = %d, next_timeout_.usec = %d\n", + this->next_timeout_.sec (), + this->next_timeout_.usec ())); + else + ACE_DEBUG ((LM_DEBUG, + "timeoutp_ == NULL")); +} + #undef PR_ST_1 #undef PR_ST_2 #endif /* ACE_SVC_HANDLER_C */ diff --git a/ace/Svc_Handler.h b/ace/Svc_Handler.h index 0be5a242ec3..a1251999404 100644 --- a/ace/Svc_Handler.h +++ b/ace/Svc_Handler.h @@ -50,14 +50,10 @@ public: // = Initialization and termination methods. ACE_Svc_Handler (ACE_Thread_Manager *thr_mgr = 0, ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0, - ACE_Reactor *reactor = ACE_Reactor::instance (), - size_t max_buffer_size = 0, - ACE_Time_Value *timeout = 0); + ACE_Reactor *reactor = ACE_Reactor::instance ()); // Constructor initializes the <thr_mgr> and <mq> by passing them // down to the <ACE_Task> base class. The <reactor> is passed to - // the <ACE_Event_Handler>. The <max_buffer_size> and <timeout> are - // used to determine at what point to flush the <mq>. By default, - // there's no buffering at all. + // the <ACE_Event_Handler>. virtual ~ACE_Svc_Handler (void); // Destructor. @@ -70,18 +66,6 @@ public: // Object termination hook -- application-specific cleanup code goes // here. - virtual int put (ACE_Message_Block *message_block, - ACE_Time_Value *timeout = 0); - // Insert the <ACE_Message_Block> chain rooted at <message_block> - // into the <ACE_Message_Queue> with the designated <timeout>. The - // <flush> method will be called if this <put> causes the number of - // bytes to exceed the maximum buffer size or if the timeout period - // has elapsed. - - virtual int flush (void); - // Flush the <ACE_Message_Queue>, which writes all the queued - // <ACE_Message_Block>s to the <PEER_STREAM>. - virtual int idle (u_long flags = 0); // Call this method if you want to recycling the <Svc_Handler> // instead of closing it. If the object does not have a recycler, @@ -154,8 +138,9 @@ public: public: - // Note: The following methods are not suppose to be public. But - // because friendship is *not* inherited in C++, these methods have + // = The following methods are not suppose to be public. + + // Because friendship is *not* inherited in C++, these methods have // to be public. // = Accessors to set/get the connection recycler. @@ -177,7 +162,7 @@ public: // recycling. Return 0 if the object is ready for recycling, -1 on // failures. -private: +protected: ACE_PEER_STREAM peer_; // Maintain connection with client. @@ -194,14 +179,72 @@ private: const void *recycling_act_; // Asynchronous Completion Token (ACT) to be used to when talking to // the recycler. +}; +template <ACE_PEER_STREAM_1, ACE_SYNCH_DECL> +class ACE_Buffered_Svc_Handler : public ACE_Svc_Handler<ACE_PEER_STREAM_2, ACE_SYNCH_USE> +{ + // = TITLE + // Defines the interface for a service that exchanges data with + // its connected peer and supports buffering. + // + // = DESCRIPTION + // The buffering feature makes it possible to queue up + // <ACE_Message_Blocks> in an <ACE_Message_Queue> until (1) the + // queue is "full" or (2) a period of time elapses, at which + // point the queue is "flushed" via <sendv_n> to the peer. +public: + // = Initialization and termination methods. + ACE_Buffered_Svc_Handler (ACE_Thread_Manager *thr_mgr = 0, + ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0, + ACE_Reactor *reactor = ACE_Reactor::instance (), + size_t max_buffer_size = 0, + ACE_Time_Value *relative_timeout = 0); + // Constructor initializes the <thr_mgr> and <mq> by passing them + // down to the <ACE_Task> base class. The <reactor> is passed to + // the <ACE_Event_Handler>. The <max_buffer_size> and + // <relative_timeout> are used to determine at what point to flush + // the <mq>. By default, there's no buffering at all. The + // <relative_timeout> value is interpreted to be in a unit that's + // relative to the current time returned by <ACE_OS::gettimeofday>. + + virtual ~ACE_Buffered_Svc_Handler (void); + // Destructor, which calls <flush>. + + virtual int put (ACE_Message_Block *message_block, + ACE_Time_Value *timeout = 0); + // Insert the <ACE_Message_Block> chain rooted at <message_block> + // into the <ACE_Message_Queue> with the designated <timeout>. The + // <flush> method will be called if this <put> causes the number of + // bytes to exceed the maximum buffer size or if the timeout period + // has elapsed. + + virtual int flush (void); + // Flush the <ACE_Message_Queue>, which writes all the queued + // <ACE_Message_Block>s to the <PEER_STREAM>. + + virtual int handle_timeout (const ACE_Time_Value &time, + const void *); + // This method is not currently implemented -- this is where the + // integration with the <Reactor> would occur. + + void dump (void) const; + // Dump the state of an object. + +protected: size_t maximum_buffer_size_; // Maximum size the <Message_Queue> can be before we have to flush // the buffer. + + size_t current_buffer_size_; + // Current size in bytes of the <Message_Queue> contents. - ACE_Time_Value timeout_; + ACE_Time_Value next_timeout_; // Timeout value used to control when the buffer is flushed. + ACE_Time_Value interval_; + // Interval of the timeout. + ACE_Time_Value *timeoutp_; // Timeout pointer. }; diff --git a/examples/Threads/task_five.cpp b/examples/Threads/task_five.cpp index b8d4847d9f1..479be53deac 100644 --- a/examples/Threads/task_five.cpp +++ b/examples/Threads/task_five.cpp @@ -1,31 +1,45 @@ // $Id$ -// Stress testing thread creation and thread cancellation using -// ACE_Task. +// ============================================================================ // -// Author: Detlef Becker <Detlef.Becker@med.siemens.de> +// = LIBRARY +// examples/Threads/ +// +// = FILENAME +// task_five.cpp +// +// = DESCRIPTION +// Stress testing thread creation and thread cancellation using +// ACE_Task. +// +// = AUTHOR +// Author: Detlef Becker <Detlef.Becker@med.siemens.de> +// +// ============================================================================ + -#include "ace/Service_Config.h" #include "ace/Thread_Manager.h" #include "ace/Task.h" ACE_RCSID(Threads, task_five, "$Id$") -static const int DEFAULT_TASKS = 1000; -static size_t default_stack_size = // Default stack size +static const int DEFAULT_TASKS = 100; +static const int DEFAULT_ITERATIONS = 10; + +// Default stack size +static size_t default_stack_size = #if defined (ACE_WIN32) 0; #else 8192; -#endif - +#endif /* ACE_WIN32 */ u_int loop_count = 0; u_int error_count = 0; class Test_Task : public ACE_Task<ACE_SYNCH> { public: - Test_Task (ACE_Thread_Manager *thrmgr = ACE_Service_Config::thr_mgr ()); + Test_Task (ACE_Thread_Manager * = ACE_Thread_Manager::instance ()); ~Test_Task (void) {}; int open (void * = 0); @@ -83,19 +97,27 @@ Test_Task::synch (void) return thr_mgr_->wait_grp (grp_id_); } -void work (ACE_Thread_Manager *thr_mgr, int n_tasks, size_t stack_size) +static void +work (ACE_Thread_Manager *thr_mgr, + int n_tasks, + size_t stack_size) { ACE_UNUSED_ARG (stack_size); int i; - Test_Task *task_array = new Test_Task[n_tasks]; + Test_Task *task_array; + + ACE_NEW (task_array, + Test_Task[n_tasks]); ACE_DEBUG ((LM_DEBUG, "Opening Tasks, loop count = %d, error count = %d\n", loop_count, error_count)); - for (i = 0; i < n_tasks; i++) + for (i = 0; + i < n_tasks; + i++) task_array[i].open (); ACE_OS::sleep (1); @@ -113,7 +135,9 @@ void work (ACE_Thread_Manager *thr_mgr, int n_tasks, size_t stack_size) loop_count, error_count)); - for (i = 0; i < n_tasks; i++) + for (i = 0; + + i < n_tasks; i++) if (-1 == task_array[i].synch ()) { ACE_ERROR ((LM_ERROR, @@ -140,10 +164,12 @@ main (int argc, char *argv[]) { size_t stack_size = argc > 1 ? ACE_OS::atoi (argv[1]) : default_stack_size; const int n_tasks = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_TASKS; + u_int iterations = argc > 3 ? ACE_OS::atoi (argv[3]) : DEFAULT_ITERATIONS; - ACE_Thread_Manager *thr_mgr = ACE_Service_Config::thr_mgr (); + for (u_int i = 0; i < iterations; i++) + work (ACE_Thread_Manager::instance (), + n_tasks, + stack_size); - for (;;) - work (thr_mgr, n_tasks, stack_size); ACE_NOTREACHED (return 0); } diff --git a/tests/Svc_Handler_Test.cpp b/tests/Svc_Handler_Test.cpp index 1e3bec1ef64..2f62e6a6d72 100644 --- a/tests/Svc_Handler_Test.cpp +++ b/tests/Svc_Handler_Test.cpp @@ -10,7 +10,7 @@ // // = DESCRIPTION // This tests illustrates the "buffering" strategy of the -// <ACE_Svc_Handler>. +// <ACE_Buffered_Svc_Handler>. // // = AUTHORS // Douglas C. Schmidt <schmidt@cs.wustl.edu> @@ -29,7 +29,7 @@ USELIB("..\ace\aced.lib"); //--------------------------------------------------------------------------- #endif /* defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 */ -typedef ACE_Svc_Handler <ACE_FILE_STREAM, ACE_NULL_SYNCH> SVC_HANDLER; +typedef ACE_Buffered_Svc_Handler <ACE_FILE_STREAM, ACE_NULL_SYNCH> SVC_HANDLER; static void run_test (SVC_HANDLER &svc_handler, @@ -125,7 +125,7 @@ main (int argc, ASYS_TCHAR *argv[]) } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Svc_Handler <ACE_FILE_STREAM, ACE_NULL_SYNCH>; +template class ACE_Buffered_Svc_Handler <ACE_FILE_STREAM, ACE_NULL_SYNCH>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Svc_Handler <ACE_FILE_STREAM, ACE_NULL_SYNCH> +#pragma instantiate ACE_Buffered_Svc_Handler <ACE_FILE_STREAM, ACE_NULL_SYNCH> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |