diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1999-06-09 21:51:54 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1999-06-09 21:51:54 +0000 |
commit | 9c8ea09de3b59afd0bc3e84c2241fbb346175721 (patch) | |
tree | bc2dc54deb9aff3a205031cc9ec7ff3f62fd7a8b /ace/Svc_Handler.cpp | |
parent | bc0265b91128210785c1f18354e5a63ddc03fd98 (diff) | |
download | ATCD-9c8ea09de3b59afd0bc3e84c2241fbb346175721.tar.gz |
.
Diffstat (limited to 'ace/Svc_Handler.cpp')
-rw-r--r-- | ace/Svc_Handler.cpp | 272 |
1 files changed, 170 insertions, 102 deletions
diff --git a/ace/Svc_Handler.cpp b/ace/Svc_Handler.cpp index 4aa7b10f018..9a301bc38f8 100644 --- a/ace/Svc_Handler.cpp +++ b/ace/Svc_Handler.cpp @@ -81,16 +81,11 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::operator delete (void *obj) template <PR_ST_1, ACE_SYNCH_DECL> ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Svc_Handler (ACE_Thread_Manager *tm, ACE_Message_Queue<ACE_SYNCH_USE> *mq, - ACE_Reactor *reactor, - size_t maximum_buffer_size, - ACE_Time_Value *timeout) + ACE_Reactor *reactor) : ACE_Task<ACE_SYNCH_USE> (tm, mq), closing_ (0), recycler_ (0), - recycling_act_ (0), - maximum_buffer_size_ (maximum_buffer_size), - timeout_ (timeout == 0 ? ACE_Time_Value::zero : *timeout), - timeoutp_ (timeout) + recycling_act_ (0) { ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Svc_Handler"); @@ -184,105 +179,24 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::cleanup_hint (void) } -template <PR_ST_1, ACE_SYNCH_DECL> int -ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::put (ACE_Message_Block *mb, - ACE_Time_Value *tv) -{ - // Enqueue <mb> onto the message queue. - if (this->putq (mb, tv) == -1) - return -1; - else - { - // Flush the buffer when the number of bytes exceeds the maximum - // buffer size or when the timeout period has elapsed. - if (mb->total_size () >= this->maximum_buffer_size_ - || (this->timeoutp_ != 0 - && this->timeout_ > ACE_OS::gettimeofday ())) - return this->flush (); - - return 0; - } -} - -// Flush the buffer. - -template <PR_ST_1, ACE_SYNCH_DECL> int -ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::flush (void) -{ - // @@ Doug, need to add appropriate locks! - ACE_Message_Block *entry = 0; - iovec iov[IOV_MAX]; - size_t i = 0; - int result = 0; - - // Iterate over all the <ACE_Message_Block>s in the - // <ACE_Message_Queue> and prepare them to be written out. - for (ACE_Message_Queue_Iterator<ACE_SYNCH_USE> iterator (*this->msg_queue ()); - iterator.next (entry) != 0 - && result == 0; - iterator.advance ()) - { - // Iterate over all the <Message_Block>s in a chain, including - // continuations. - for (ACE_Message_Block *temp = entry; - temp != 0; - temp = entry->cont ()) - { - iov[i].iov_len = temp->length (); - iov[i].iov_base = temp->rd_ptr (); - - i++; - - // Flush the <iovec>s when we've reached the maximum size - // for the platform. - if (i == IOV_MAX) - { -#if defined (ACE_DEBUGGING) - ACE_DEBUG ((LM_DEBUG, - "sending data (inside loop, i = %d)\n", - i)); -#endif /* ACE_DEBUGGING */ - // Send off the data. - if (this->peer ().sendv_n (iov, - i) == -1) - { - result = -1; - break; - } - i = 0; - } - } - } - - // Take care of any remaining <iovec>s. - if (i > 0 && result != -1) - { - if (this->peer ().sendv_n (iov, i) == -1) - result = -1; -#if defined (ACE_DEBUGGING) - ACE_DEBUG ((LM_DEBUG, - "sending data (final flush, i = %d)\n", - i)); -#endif /* ACE_DEBUGGING */ - } - - // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue> - // and <release> their memory. - while (this->msg_queue ()->is_empty () == 0) - { - if (this->msg_queue ()->dequeue_head (entry) == -1) - break; - - entry->release (); - } - - return result; -} - template <PR_ST_1, ACE_SYNCH_DECL> void ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump (void) const { ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump"); + + this->peer_.dump (); + ACE_DEBUG ((LM_DEBUG, + "dynamic_ = %d\n", + this->dynamic_)); + ACE_DEBUG ((LM_DEBUG, + "closing_ = %d\n", + this->closing_)); + ACE_DEBUG ((LM_DEBUG, + "recycler_ = %d\n", + this->recycler_)); + ACE_DEBUG ((LM_DEBUG, + "recycling_act_ = %d\n", + this->recycling_act_)); } template <PR_ST_1, ACE_SYNCH_DECL> ACE_PEER_STREAM & @@ -414,6 +328,160 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::recycle (void *) return 0; } +template <PR_ST_1, ACE_SYNCH_DECL> +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::~ACE_Buffered_Svc_Handler (void) +{ + this->flush (); +} + +template <PR_ST_1, ACE_SYNCH_DECL> +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Buffered_Svc_Handler (ACE_Thread_Manager *tm, + ACE_Message_Queue<ACE_SYNCH_USE> *mq, + ACE_Reactor *reactor, + size_t maximum_buffer_size, + ACE_Time_Value *timeout) + : ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE> (tm, mq, reactor), + maximum_buffer_size_ (maximum_buffer_size), + current_buffer_size_ (0), + timeoutp_ (timeout) +{ + ACE_TRACE ("ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Buffered_Svc_Handler"); + + if (this->timeoutp_ != 0) + { + this->interval_ = *timeout; + this->next_timeout_ = ACE_OS::gettimeofday () + this->interval_; + } +} + +template <PR_ST_1, ACE_SYNCH_DECL> int +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->msg_queue ()->lock (), -1); + + // Enqueue <mb> onto the message queue. + if (this->putq (mb, tv) == -1) + return -1; + else + { + // Update the current number of bytes on the queue. + this->current_buffer_size_ += mb->total_size (); + + // Flush the buffer when the number of bytes exceeds the maximum + // buffer size or when the timeout period has elapsed. + if (this->current_buffer_size_ >= this->maximum_buffer_size_ + || (this->timeoutp_ != 0 + && this->next_timeout_ <= ACE_OS::gettimeofday ())) + return this->flush (); + else + return 0; + } +} + +// Flush the buffer. + +template <PR_ST_1, ACE_SYNCH_DECL> int +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::flush (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->msg_queue ()->lock (), -1); + ACE_Message_Block *entry = 0; + iovec iov[IOV_MAX]; + size_t i = 0; + int result = 0; + + // Iterate over all the <ACE_Message_Block>s in the + // <ACE_Message_Queue> and prepare them to be written out. + for (ACE_Message_Queue_Iterator<ACE_SYNCH_USE> iterator (*this->msg_queue ()); + iterator.next (entry) != 0 + && result == 0; + iterator.advance ()) + { + // Iterate over all the <Message_Block>s in a chain, including + // continuations. + for (ACE_Message_Block *temp = entry; + temp != 0; + temp = entry->cont ()) + { + iov[i].iov_len = temp->length (); + iov[i].iov_base = temp->rd_ptr (); + + i++; + + // Flush the <iovec>s when we've reached the maximum size + // for the platform. + if (i == IOV_MAX) + { +#if defined (ACE_DEBUGGING) + ACE_DEBUG ((LM_DEBUG, + "sending data (inside loop, i = %d)\n", + i)); +#endif /* ACE_DEBUGGING */ + // Send off the data. + if (this->peer ().sendv_n (iov, + i) == -1) + { + result = -1; + break; + } + i = 0; + } + } + } + + // Take care of any remaining <iovec>s. + if (i > 0 && result != -1) + { + if (this->peer ().sendv_n (iov, i) == -1) + result = -1; +#if defined (ACE_DEBUGGING) + ACE_DEBUG ((LM_DEBUG, + "sending data (final flush, i = %d)\n", + i)); +#endif /* ACE_DEBUGGING */ + } + + // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue> + // and <release> their memory. + while (this->msg_queue ()->is_empty () == 0) + { + if (this->msg_queue ()->dequeue_head (entry) == -1) + break; + + entry->release (); + } + + if (this->timeoutp_ != 0) + // Update the next timeout period by adding the interval. + this->next_timeout_ += this->interval_; + + this->current_buffer_size_ = 0; + + return result; +} + +template <PR_ST_1, ACE_SYNCH_DECL> void +ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump (void) const +{ + ACE_TRACE ("ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump"); + + ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump (); + ACE_DEBUG ((LM_DEBUG, + "maximum_buffer_size_ = %d\n", + this->maximum_buffer_size_)); + ACE_DEBUG ((LM_DEBUG, + "current_buffer_size_ = %d\n", + this->current_buffer_size_)); + if (this->next_timeout_ != 0) + ACE_DEBUG ((LM_DEBUG, + "next_timeout_.sec = %d, next_timeout_.usec = %d\n", + this->next_timeout_.sec (), + this->next_timeout_.usec ())); + else + ACE_DEBUG ((LM_DEBUG, + "timeoutp_ == NULL")); +} + #undef PR_ST_1 #undef PR_ST_2 #endif /* ACE_SVC_HANDLER_C */ |