summaryrefslogtreecommitdiff
path: root/ace/Svc_Handler.cpp
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1999-06-09 21:51:54 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1999-06-09 21:51:54 +0000
commit9c8ea09de3b59afd0bc3e84c2241fbb346175721 (patch)
treebc2dc54deb9aff3a205031cc9ec7ff3f62fd7a8b /ace/Svc_Handler.cpp
parentbc0265b91128210785c1f18354e5a63ddc03fd98 (diff)
downloadATCD-9c8ea09de3b59afd0bc3e84c2241fbb346175721.tar.gz
.
Diffstat (limited to 'ace/Svc_Handler.cpp')
-rw-r--r--ace/Svc_Handler.cpp272
1 files changed, 170 insertions, 102 deletions
diff --git a/ace/Svc_Handler.cpp b/ace/Svc_Handler.cpp
index 4aa7b10f018..9a301bc38f8 100644
--- a/ace/Svc_Handler.cpp
+++ b/ace/Svc_Handler.cpp
@@ -81,16 +81,11 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::operator delete (void *obj)
template <PR_ST_1, ACE_SYNCH_DECL>
ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Svc_Handler (ACE_Thread_Manager *tm,
ACE_Message_Queue<ACE_SYNCH_USE> *mq,
- ACE_Reactor *reactor,
- size_t maximum_buffer_size,
- ACE_Time_Value *timeout)
+ ACE_Reactor *reactor)
: ACE_Task<ACE_SYNCH_USE> (tm, mq),
closing_ (0),
recycler_ (0),
- recycling_act_ (0),
- maximum_buffer_size_ (maximum_buffer_size),
- timeout_ (timeout == 0 ? ACE_Time_Value::zero : *timeout),
- timeoutp_ (timeout)
+ recycling_act_ (0)
{
ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Svc_Handler");
@@ -184,105 +179,24 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::cleanup_hint (void)
}
-template <PR_ST_1, ACE_SYNCH_DECL> int
-ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::put (ACE_Message_Block *mb,
- ACE_Time_Value *tv)
-{
- // Enqueue <mb> onto the message queue.
- if (this->putq (mb, tv) == -1)
- return -1;
- else
- {
- // Flush the buffer when the number of bytes exceeds the maximum
- // buffer size or when the timeout period has elapsed.
- if (mb->total_size () >= this->maximum_buffer_size_
- || (this->timeoutp_ != 0
- && this->timeout_ > ACE_OS::gettimeofday ()))
- return this->flush ();
-
- return 0;
- }
-}
-
-// Flush the buffer.
-
-template <PR_ST_1, ACE_SYNCH_DECL> int
-ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::flush (void)
-{
- // @@ Doug, need to add appropriate locks!
- ACE_Message_Block *entry = 0;
- iovec iov[IOV_MAX];
- size_t i = 0;
- int result = 0;
-
- // Iterate over all the <ACE_Message_Block>s in the
- // <ACE_Message_Queue> and prepare them to be written out.
- for (ACE_Message_Queue_Iterator<ACE_SYNCH_USE> iterator (*this->msg_queue ());
- iterator.next (entry) != 0
- && result == 0;
- iterator.advance ())
- {
- // Iterate over all the <Message_Block>s in a chain, including
- // continuations.
- for (ACE_Message_Block *temp = entry;
- temp != 0;
- temp = entry->cont ())
- {
- iov[i].iov_len = temp->length ();
- iov[i].iov_base = temp->rd_ptr ();
-
- i++;
-
- // Flush the <iovec>s when we've reached the maximum size
- // for the platform.
- if (i == IOV_MAX)
- {
-#if defined (ACE_DEBUGGING)
- ACE_DEBUG ((LM_DEBUG,
- "sending data (inside loop, i = %d)\n",
- i));
-#endif /* ACE_DEBUGGING */
- // Send off the data.
- if (this->peer ().sendv_n (iov,
- i) == -1)
- {
- result = -1;
- break;
- }
- i = 0;
- }
- }
- }
-
- // Take care of any remaining <iovec>s.
- if (i > 0 && result != -1)
- {
- if (this->peer ().sendv_n (iov, i) == -1)
- result = -1;
-#if defined (ACE_DEBUGGING)
- ACE_DEBUG ((LM_DEBUG,
- "sending data (final flush, i = %d)\n",
- i));
-#endif /* ACE_DEBUGGING */
- }
-
- // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue>
- // and <release> their memory.
- while (this->msg_queue ()->is_empty () == 0)
- {
- if (this->msg_queue ()->dequeue_head (entry) == -1)
- break;
-
- entry->release ();
- }
-
- return result;
-}
-
template <PR_ST_1, ACE_SYNCH_DECL> void
ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump (void) const
{
ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump");
+
+ this->peer_.dump ();
+ ACE_DEBUG ((LM_DEBUG,
+ "dynamic_ = %d\n",
+ this->dynamic_));
+ ACE_DEBUG ((LM_DEBUG,
+ "closing_ = %d\n",
+ this->closing_));
+ ACE_DEBUG ((LM_DEBUG,
+ "recycler_ = %d\n",
+ this->recycler_));
+ ACE_DEBUG ((LM_DEBUG,
+ "recycling_act_ = %d\n",
+ this->recycling_act_));
}
template <PR_ST_1, ACE_SYNCH_DECL> ACE_PEER_STREAM &
@@ -414,6 +328,160 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::recycle (void *)
return 0;
}
+template <PR_ST_1, ACE_SYNCH_DECL>
+ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::~ACE_Buffered_Svc_Handler (void)
+{
+ this->flush ();
+}
+
+template <PR_ST_1, ACE_SYNCH_DECL>
+ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Buffered_Svc_Handler (ACE_Thread_Manager *tm,
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq,
+ ACE_Reactor *reactor,
+ size_t maximum_buffer_size,
+ ACE_Time_Value *timeout)
+ : ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_USE> (tm, mq, reactor),
+ maximum_buffer_size_ (maximum_buffer_size),
+ current_buffer_size_ (0),
+ timeoutp_ (timeout)
+{
+ ACE_TRACE ("ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::ACE_Buffered_Svc_Handler");
+
+ if (this->timeoutp_ != 0)
+ {
+ this->interval_ = *timeout;
+ this->next_timeout_ = ACE_OS::gettimeofday () + this->interval_;
+ }
+}
+
+template <PR_ST_1, ACE_SYNCH_DECL> int
+ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->msg_queue ()->lock (), -1);
+
+ // Enqueue <mb> onto the message queue.
+ if (this->putq (mb, tv) == -1)
+ return -1;
+ else
+ {
+ // Update the current number of bytes on the queue.
+ this->current_buffer_size_ += mb->total_size ();
+
+ // Flush the buffer when the number of bytes exceeds the maximum
+ // buffer size or when the timeout period has elapsed.
+ if (this->current_buffer_size_ >= this->maximum_buffer_size_
+ || (this->timeoutp_ != 0
+ && this->next_timeout_ <= ACE_OS::gettimeofday ()))
+ return this->flush ();
+ else
+ return 0;
+ }
+}
+
+// Flush the buffer.
+
+template <PR_ST_1, ACE_SYNCH_DECL> int
+ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::flush (void)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->msg_queue ()->lock (), -1);
+ ACE_Message_Block *entry = 0;
+ iovec iov[IOV_MAX];
+ size_t i = 0;
+ int result = 0;
+
+ // Iterate over all the <ACE_Message_Block>s in the
+ // <ACE_Message_Queue> and prepare them to be written out.
+ for (ACE_Message_Queue_Iterator<ACE_SYNCH_USE> iterator (*this->msg_queue ());
+ iterator.next (entry) != 0
+ && result == 0;
+ iterator.advance ())
+ {
+ // Iterate over all the <Message_Block>s in a chain, including
+ // continuations.
+ for (ACE_Message_Block *temp = entry;
+ temp != 0;
+ temp = entry->cont ())
+ {
+ iov[i].iov_len = temp->length ();
+ iov[i].iov_base = temp->rd_ptr ();
+
+ i++;
+
+ // Flush the <iovec>s when we've reached the maximum size
+ // for the platform.
+ if (i == IOV_MAX)
+ {
+#if defined (ACE_DEBUGGING)
+ ACE_DEBUG ((LM_DEBUG,
+ "sending data (inside loop, i = %d)\n",
+ i));
+#endif /* ACE_DEBUGGING */
+ // Send off the data.
+ if (this->peer ().sendv_n (iov,
+ i) == -1)
+ {
+ result = -1;
+ break;
+ }
+ i = 0;
+ }
+ }
+ }
+
+ // Take care of any remaining <iovec>s.
+ if (i > 0 && result != -1)
+ {
+ if (this->peer ().sendv_n (iov, i) == -1)
+ result = -1;
+#if defined (ACE_DEBUGGING)
+ ACE_DEBUG ((LM_DEBUG,
+ "sending data (final flush, i = %d)\n",
+ i));
+#endif /* ACE_DEBUGGING */
+ }
+
+ // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue>
+ // and <release> their memory.
+ while (this->msg_queue ()->is_empty () == 0)
+ {
+ if (this->msg_queue ()->dequeue_head (entry) == -1)
+ break;
+
+ entry->release ();
+ }
+
+ if (this->timeoutp_ != 0)
+ // Update the next timeout period by adding the interval.
+ this->next_timeout_ += this->interval_;
+
+ this->current_buffer_size_ = 0;
+
+ return result;
+}
+
+template <PR_ST_1, ACE_SYNCH_DECL> void
+ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump (void) const
+{
+ ACE_TRACE ("ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump");
+
+ ACE_Buffered_Svc_Handler<PR_ST_2, ACE_SYNCH_USE>::dump ();
+ ACE_DEBUG ((LM_DEBUG,
+ "maximum_buffer_size_ = %d\n",
+ this->maximum_buffer_size_));
+ ACE_DEBUG ((LM_DEBUG,
+ "current_buffer_size_ = %d\n",
+ this->current_buffer_size_));
+ if (this->next_timeout_ != 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "next_timeout_.sec = %d, next_timeout_.usec = %d\n",
+ this->next_timeout_.sec (),
+ this->next_timeout_.usec ()));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "timeoutp_ == NULL"));
+}
+
#undef PR_ST_1
#undef PR_ST_2
#endif /* ACE_SVC_HANDLER_C */