summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-07-28 14:11:49 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-07-28 14:11:49 +0000
commit19a3f5ae3b7c94320e0b954edaaf765596b3d78c (patch)
tree7eee82ee25e5339a7203ba58e3b89490817c91aa
parenta18a408a1db9890a8a809e84f57248a71f220651 (diff)
downloadATCD-19a3f5ae3b7c94320e0b954edaaf765596b3d78c.tar.gz
ChangeLogTag: Sun Jul 28 09:03:08 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/ChangeLog92
-rw-r--r--TAO/tao/Asynch_Queued_Message.cpp95
-rw-r--r--TAO/tao/Asynch_Queued_Message.h25
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp50
-rw-r--r--TAO/tao/IIOP_Transport.cpp4
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp2
-rw-r--r--TAO/tao/Invocation.cpp11
-rw-r--r--TAO/tao/Invocation.h4
-rw-r--r--TAO/tao/Queued_Message.cpp6
-rw-r--r--TAO/tao/Queued_Message.h37
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp4
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp4
-rw-r--r--TAO/tao/Strategies/UIOP_Transport.cpp4
-rw-r--r--TAO/tao/Synch_Queued_Message.cpp73
-rw-r--r--TAO/tao/Synch_Queued_Message.h7
-rw-r--r--TAO/tao/TAO_Server_Request.cpp12
-rw-r--r--TAO/tao/Transport.cpp132
-rw-r--r--TAO/tao/Transport.h71
18 files changed, 529 insertions, 104 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index a80d3f846ee..9c147a073c3 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,95 @@
+Sun Jul 28 09:03:08 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ Merge from the branch bug_1125_stage_0. This merge solves the
+ dreaded problem of ORB building up the stack when a reply is
+ written. This should fix [BUG 1125].
+
+ Wed Jul 24 22:30:29 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Synch_Queued_Message.h:
+ * tao/Synch_Queued_Message.cpp:
+ * tao/Asynch_Queued_Message.h (TAO_Asynch_Queued_Message):
+ * tao/Asynch_Queued_Message.cpp: Added comments and spruced up
+ line spacings.
+
+ * tao/Transport.cpp: Added a debug statement for logging
+ purposes.
+
+
+ Mon Jul 22 17:52:04 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/GIOP_Message_Base.cpp: We now use the global pool for
+ allocating memory for the return path. Using TSS created
+ problems when we were trying to move the unsent messages from
+ the TSS to global pool. The message block makes things to crash
+ horribly, especially at places that you have no clue about.
+
+ * tao/Transport.cpp: Do not reset the allocators since it is not
+ required.
+
+ Tue Jul 16 07:06:33 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Synch_Queued_Message.cpp:
+ * tao/Asynch_Queued_Message.cpp: Removed code that reset stuff
+ from the destructor. This caused problems when trying to use
+ the ACE_DES_FREE macro since it called the destructor first and
+ then free ().
+
+ * tao/TAO_Server_Request.cpp: All calls to send_message () will
+ carry an argument mentioning that they are replies.
+
+ * tao/GIOP_Message_Base.cpp: Fixed a problem in dump_msg ()
+ method. The probelm was that we neved respected the byte-order
+ of the sending ORB when printing out request id's. Thanks to Kew
+ Whitney <Whitney.Kew@Invensys.com>
+
+ * tao/Transport.cpp: Cosmetic fixes.
+
+ Sun Jul 14 10:49:33 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Queued_Message.h:
+ * tao/Queued_Message.cpp: Added an argument in the constructor to
+ pass an allocator. Added a pure virtual method, clone () to the
+ interface.
+
+ * tao/Synch_Queued_Message.h:
+ * tao/Synch_Queued_Message.cpp:
+ * tao/Asynch_Queued_Message.h:
+ * tao/Asynch_Queued_Message.cpp: Implemented the method clone ().
+
+ * tao/Incoming_Mesage_Queue.cpp: Fixed a typo in a print
+ statement.
+
+ Fri Jul 12 14:10:14 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+ - Added a new enum enumerating the different message types
+ recognised within the ORB.
+
+ - Added two new methods send_reply_message_i () and
+ send_synch_message_helper_i (). The method
+ send_reply_message_i () seperates the path of the reply
+ message from the send_synchronous_message_i () method. The
+ method send_synch_message_helper_i () is a helper method
+ containing common code for the request and reply paths.
+
+ - The methods, send_message (), send_message_shared () and
+ send_message_shared_i () had a variable named
+ is_synchronous. The variable name has been changed to reflect
+ the right usage.
+
+ * tao/IIOP_Transport.cpp:
+ * tao/Strategies/DIOP_Transport.cpp:
+ * tao/Strategies/SHMIOP_Transport.cpp:
+ * tao/Strategies/UIOP_Transport.cpp: The variable names
+ is_synchronous was changed to reflect the usage.
+
+ * tao/Invocation.cpp:
+ * tao/Invocation.h: Instead of calling invoke () with a magic
+ number, used the enumeration defined in Transport.h.
+
+
Sun Jul 28 08:13:59 2002 Balachandran Natarajan <bala@cs.wustl.edu>
* orbsvcs/tests/Miop/McastHello/Makefile: Updated dependencies.
diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp
index 736a31208fd..6fca0aa65dd 100644
--- a/TAO/tao/Asynch_Queued_Message.cpp
+++ b/TAO/tao/Asynch_Queued_Message.cpp
@@ -1,5 +1,6 @@
#include "Asynch_Queued_Message.h"
-
+#include "debug.h"
+#include "ace/Malloc_T.h"
#include "ace/Log_Msg.h"
@@ -9,8 +10,10 @@ ACE_RCSID (tao,
TAO_Asynch_Queued_Message::
- TAO_Asynch_Queued_Message (const ACE_Message_Block *contents)
- : offset_ (0)
+ TAO_Asynch_Queued_Message (const ACE_Message_Block *contents,
+ ACE_Allocator *alloc)
+ : TAO_Queued_Message (alloc)
+ , offset_ (0)
{
this->size_ = contents->total_length ();
// @@ Use a pool for these guys!!
@@ -28,6 +31,16 @@ TAO_Asynch_Queued_Message::
}
}
+TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf,
+ size_t size,
+ ACE_Allocator *alloc)
+ : TAO_Queued_Message (alloc)
+ , size_ (size)
+ , offset_ (0)
+ , buffer_ (buf)
+{
+}
+
TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void)
{
// @@ Use a pool for these guys!
@@ -78,9 +91,81 @@ TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count)
this->state_changed (TAO_LF_Event::LFS_SUCCESS);
}
+
+TAO_Queued_Message *
+TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
+{
+ char *buf = 0;
+
+ // @@todo: Need to use a memory pool. But certain things need to
+ // change a bit in this class for that. Till then.
+
+ // Just allocate and copy data that needs to be sent, no point
+ // copying the whole buffer.
+ size_t sz = this->size_ - this->offset_;
+
+ ACE_NEW_RETURN (buf,
+ char[sz],
+ 0);
+
+ ACE_OS::memcpy (buf,
+ this->buffer_ + this->offset_,
+ sz);
+
+ TAO_Asynch_Queued_Message *qm = 0;
+
+ if (alloc)
+ {
+ ACE_NEW_MALLOC_RETURN (qm,
+ ACE_static_cast (TAO_Asynch_Queued_Message *,
+ alloc->malloc (sizeof (TAO_Asynch_Queued_Message))),
+ TAO_Asynch_Queued_Message (buf,
+ sz,
+ alloc),
+ 0);
+ }
+ else
+ {
+ // No allocator, so use the common heap!
+ if (TAO_debug_level == 4)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Asynch_Queued_Message::clone\n",
+ "Using global pool for allocation \n"));
+ }
+
+ ACE_NEW_RETURN (qm,
+ TAO_Asynch_Queued_Message (buf,
+ sz),
+ 0);
+ }
+
+ // Set the flag to indicate that <qm> is created on the heap.
+ if (qm)
+ qm->is_heap_created_ = 1;
+
+ return qm;
+}
+
void
TAO_Asynch_Queued_Message::destroy (void)
{
- // @@ Maybe it comes from a pool, we should do something about it.
- delete this;
+ if (this->is_heap_created_)
+ {
+ // If we have an allocator release the memory to the allocator
+ // pool.
+ if (this->allocator_)
+ {
+ ACE_DES_FREE (this,
+ this->allocator_->free,
+ TAO_Asynch_Queued_Message);
+
+ }
+ else // global release..
+ {
+ delete this;
+ }
+ }
+
}
diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h
index dbb4ca38bb0..02bd5e7e8d6 100644
--- a/TAO/tao/Asynch_Queued_Message.h
+++ b/TAO/tao/Asynch_Queued_Message.h
@@ -34,12 +34,16 @@ public:
/**
* @param contents The message block chain that must be sent.
*
+ * @param alloc Allocator used for creating <this> object.
+ *
* @todo I'm almost sure this class will require a callback
* interface for AMIs sent with SYNC_NONE policy. Those guys
* need to hear when the connection timeouts or closes, but
* cannot block waiting for the message to be delivered.
*/
- TAO_Asynch_Queued_Message (const ACE_Message_Block *contents);
+ TAO_Asynch_Queued_Message (const ACE_Message_Block *contents,
+ ACE_Allocator *alloc = 0);
+
/// Destructor
virtual ~TAO_Asynch_Queued_Message (void);
@@ -51,9 +55,28 @@ public:
virtual int all_data_sent (void) const;
virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const;
virtual void bytes_transferred (size_t &byte_count);
+ /// @@NOTE: No reason to belive why this would be called. But have
+ /// it here for the sake of uniformity.
+ virtual TAO_Queued_Message *clone (ACE_Allocator *alloc);
virtual void destroy (void);
//@}
+protected:
+ /// Constructor
+ /**
+ * @param buf The buffer that needs to be sent on the wire. The
+ * buffer will be owned by this class. The buffer will be
+ * deleted when the destructor is called and hence the
+ * buffer should always come off the heap!
+ *
+ * @param size The size of the buffer <buf> that is being handed
+ * over.
+ *
+ * @param alloc Allocator used for creating <this> object.
+ */
+ TAO_Asynch_Queued_Message (char *buf,
+ size_t size,
+ ACE_Allocator *alloc = 0);
private:
/// The number of bytes in the buffer
size_t size_;
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 7e89fce4ba5..d0691e7a65f 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -584,12 +584,21 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
#endif /* ACE_HAS_PURIFY */
// Initialze an output CDR on the stack
+ // NOTE: Dont jump to a conclusion as to why we are using the
+ // inpout_cdr and hence the global pool here. These pools will move
+ // to the lanes anyway at some point of time. Further, it would have
+ // been awesome to have this in TSS. But for some reason the cloning
+ // that happens when the ORB gets flow controlled while writing a
+ // reply is messing things up. We crash horribly. Doing this adds a
+ // lock, we need to set things like this -- put stuff in TSS here
+ // and transfer to global memory when we get flow controlled. We
+ // need to work on the message block to get it right!
TAO_OutputCDR output (repbuf,
sizeof repbuf,
TAO_ENCAP_BYTE_ORDER,
- this->orb_core_->output_cdr_buffer_allocator (),
- this->orb_core_->output_cdr_dblock_allocator (),
- this->orb_core_->output_cdr_msgblock_allocator (),
+ this->orb_core_->input_cdr_buffer_allocator (),
+ this->orb_core_->input_cdr_dblock_allocator (),
+ this->orb_core_->input_cdr_msgblock_allocator (),
this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
qd->major_version_,
qd->minor_version_,
@@ -882,7 +891,9 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
return -1;
}
- int result = transport->send_message (output);
+ int result = transport->send_message (output,
+ 0,
+ TAO_Transport::TAO_REPLY);
if (result == -1)
{
if (TAO_debug_level > 0)
@@ -1154,7 +1165,9 @@ TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
status_info);
// Send the message
- int result = transport->send_message (output);
+ int result = transport->send_message (output,
+ 0,
+ TAO_Transport::TAO_REPLY);
// Print out message if there is an error
if (result == -1)
@@ -1387,7 +1400,9 @@ TAO_GIOP_Message_Base::send_reply_exception (
*x) == -1)
return -1;
- return transport->send_message (output);
+ return transport->send_message (output,
+ 0,
+ TAO_Transport::TAO_REPLY);
}
void
@@ -1427,22 +1442,33 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
// request/reply id.
CORBA::ULong tmp = 0;
CORBA::ULong *id = &tmp;
+ char *tmp_id = 0;
if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY)
{
- // @@ Only works if ServiceContextList is empty....
if (minor < 2)
{
- id = ACE_reinterpret_cast (CORBA::ULong *,
- (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4));
-
+ // @@ Only works if ServiceContextList is empty....
+ tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4);
}
else
{
- id = ACE_reinterpret_cast (CORBA::ULong *,
- (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN));
+ tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
}
+#if !defined (ACE_DISABLE_SWAP_ON_READ)
+ if (byte_order == TAO_ENCAP_BYTE_ORDER)
+ {
+ id = ACE_reinterpret_cast (ACE_CDR::ULong*, tmp_id);
+ }
+ else
+ {
+ ACE_CDR::swap_4 (tmp_id, ACE_reinterpret_cast (char*,id));
+ }
+#else
+ id = ACE_reinterpret_cast(ACE_CDR::ULong*, tmp_id);
+#endif /* ACE_DISABLE_SWAP_ON_READ */
+
}
// Print.
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 9ba0eb66310..2b8adae0107 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -177,7 +177,7 @@ TAO_IIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
- int twoway,
+ int message_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
@@ -186,7 +186,7 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
- twoway,
+ message_semantics,
stream.begin (),
max_wait_time);
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 0331c30d572..ef589e39cb8 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -194,7 +194,7 @@ TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc)
{
// This debug is for testing purposes!
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Queued_Data[%d]::get_queued_data\n",
+ "TAO (%P|%t) - Queued_Data::get_queued_data\n",
"Using global pool for allocation \n"));
}
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index a41f8fd538e..2459e750f00 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -370,7 +370,7 @@ TAO_GIOP_Invocation::prepare_header (CORBA::Octet response_flags
// Send request.
int
-TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous
+TAO_GIOP_Invocation::invoke (CORBA::Boolean write_semantics
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
@@ -386,7 +386,7 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous
this->transport_->send_request (this->stub_,
this->orb_core_,
this->out_stream_,
- is_synchronous,
+ write_semantics,
this->max_wait_time_);
//
@@ -589,7 +589,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request
}
// Just send the request, without trying to wait for the reply.
- int retval = TAO_GIOP_Invocation::invoke (1 ACE_ENV_ARG_PARAMETER);
+ int retval = TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_TWOWAY_REQUEST
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (retval);
if (retval != TAO_INVOKE_OK)
@@ -962,12 +963,12 @@ TAO_GIOP_Oneway_Invocation::invoke (ACE_ENV_SINGLE_ARG_DECL)
|| this->sync_scope_ == TAO::SYNC_EAGER_BUFFERING
|| this->sync_scope_ == TAO::SYNC_DELAYED_BUFFERING)
{
- return TAO_GIOP_Invocation::invoke (0
+ return TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_ONEWAY_REQUEST
ACE_ENV_ARG_PARAMETER);
}
if (this->sync_scope_ == Messaging::SYNC_WITH_TRANSPORT)
{
- return TAO_GIOP_Invocation::invoke (1
+ return TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_TWOWAY_REQUEST
ACE_ENV_ARG_PARAMETER);
}
diff --git a/TAO/tao/Invocation.h b/TAO/tao/Invocation.h
index ef2ef23d30d..8a1f8658409 100644
--- a/TAO/tao/Invocation.h
+++ b/TAO/tao/Invocation.h
@@ -206,7 +206,7 @@ protected:
* Returns TAO_INVOKE_RESTART if the write call failed and the
* request must be re-attempted.
*
- * @param is_synchronous If set invoke() does not return until the
+ * @param write_semantics If set invoke() does not return until the
* message is completely delivered to the underlying
* transport mechanism, or an error is detected.
*
@@ -214,7 +214,7 @@ protected:
* that the server closed the connection simply to release
* resources.
*/
- int invoke (CORBA::Boolean is_synchronous
+ int invoke (CORBA::Boolean write_semantics
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException));
diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp
index 68d308b2eeb..dfb957674f4 100644
--- a/TAO/tao/Queued_Message.cpp
+++ b/TAO/tao/Queued_Message.cpp
@@ -9,8 +9,10 @@
ACE_RCSID(tao, Queued_Message, "$Id$")
-TAO_Queued_Message::TAO_Queued_Message (void)
- : next_ (0)
+TAO_Queued_Message::TAO_Queued_Message (ACE_Allocator *alloc)
+ : allocator_ (alloc)
+ , is_heap_created_ (0)
+ , next_ (0)
, prev_ (0)
{
}
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index 591aeb01b11..47a98735a4d 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -15,14 +15,15 @@
#include "ace/pre.h"
#include "corbafwd.h"
-#include "LF_Event.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class ACE_Message_Block;
+#include "LF_Event.h"
+class ACE_Message_Block;
+class ACE_Allocator;
/**
* @class TAO_Queued_Message
*
@@ -65,7 +66,7 @@ class TAO_Export TAO_Queued_Message : public TAO_LF_Event
{
public:
/// Constructor
- TAO_Queued_Message (void);
+ TAO_Queued_Message (ACE_Allocator *alloc = 0);
/// Destructor
virtual ~TAO_Queued_Message (void);
@@ -142,7 +143,9 @@ public:
* method should update this counter
* @param iov The io vector
*/
- virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const = 0;
+ virtual void fill_iov (int iovcnt_max,
+ int &iovcnt,
+ iovec iov[]) const = 0;
/// Update the internal state, data has been sent.
/**
@@ -160,6 +163,18 @@ public:
*/
virtual void bytes_transferred (size_t &byte_count) = 0;
+ /// Clone this element
+ /*
+ * Clone the element and return a pointer to the cloned element on
+ * the heap.
+ *
+ * @param allocator Use the allocator for creating the new element
+ * on the heap. Remember, that the allocator will
+ * not be used allocate the data contained in this
+ * element.
+ */
+ virtual TAO_Queued_Message *clone (ACE_Allocator *allocator) = 0;
+
/// Reclaim resources
/**
* Reliable messages are allocated from the stack, thus they do not
@@ -170,6 +185,20 @@ public:
virtual void destroy (void) = 0;
//@}
+protected:
+ /*
+ * Allocator that was used to create <this> object on the heap. If the
+ * allocator is null then <this> is on stack.
+ */
+ ACE_Allocator *allocator_;
+
+ /*
+ * A flag that acts as a boolean to indicate whether <this> is on
+ * stack or heap. A non-zero value indicates that <this> was created
+ * on heap.
+ */
+ int is_heap_created_;
+
private:
/// Implement an intrusive double-linked list for the message queue
TAO_Queued_Message *next_;
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
index 0b9e65da5de..bf1c77f8d7d 100644
--- a/TAO/tao/Strategies/DIOP_Transport.cpp
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -267,7 +267,7 @@ TAO_DIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
- int twoway,
+ int message_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
@@ -280,7 +280,7 @@ TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
- twoway,
+ message_semantics,
stream.begin (),
max_wait_time);
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index a3e32520f27..8d539d0b55c 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -240,7 +240,7 @@ TAO_SHMIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
- int twoway,
+ int message_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
@@ -253,7 +253,7 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
- twoway,
+ message_semantics,
stream.begin (),
max_wait_time);
diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp
index bdc417892dd..30bc815433c 100644
--- a/TAO/tao/Strategies/UIOP_Transport.cpp
+++ b/TAO/tao/Strategies/UIOP_Transport.cpp
@@ -170,7 +170,7 @@ TAO_UIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
- int twoway,
+ int message_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
@@ -183,7 +183,7 @@ TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
- twoway,
+ message_semantics,
stream.begin (),
max_wait_time);
diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp
index ce7bd413d8a..c6f62892147 100644
--- a/TAO/tao/Synch_Queued_Message.cpp
+++ b/TAO/tao/Synch_Queued_Message.cpp
@@ -1,4 +1,6 @@
#include "Synch_Queued_Message.h"
+#include "debug.h"
+#include "ace/Malloc_T.h"
#include "ace/Log_Msg.h"
@@ -8,14 +10,17 @@ ACE_RCSID (tao,
TAO_Synch_Queued_Message::
- TAO_Synch_Queued_Message (const ACE_Message_Block *contents)
- : contents_ (ACE_const_cast (ACE_Message_Block*,contents))
+ TAO_Synch_Queued_Message (const ACE_Message_Block *contents,
+ ACE_Allocator *alloc)
+ : TAO_Queued_Message (alloc)
+ , contents_ (ACE_const_cast (ACE_Message_Block*,contents))
, current_block_ (contents_)
{
}
TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void)
{
+
}
const ACE_Message_Block *
@@ -92,7 +97,71 @@ TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count)
this->state_changed (TAO_LF_Event::LFS_SUCCESS);
}
+TAO_Queued_Message *
+TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
+{
+ TAO_Synch_Queued_Message *qm = 0;
+
+ // Clone the message block.
+ // NOTE: We wantedly do the cloning from <current_block_> instead of
+ // starting from <contents_> since we dont want to clone blocks that
+ // have already been sent on the wire. Waste of memory and
+ // associated copying.
+ ACE_Message_Block *mb =
+ this->current_block_->clone ();
+
+ if (alloc)
+ {
+ ACE_NEW_MALLOC_RETURN (qm,
+ ACE_static_cast (TAO_Synch_Queued_Message *,
+ alloc->malloc (sizeof (TAO_Synch_Queued_Message))),
+ TAO_Synch_Queued_Message (mb,
+ alloc),
+ 0);
+ }
+ else
+ {
+ // No allocator, so use the common heap!
+ if (TAO_debug_level == 4)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Synch_Queued_Message::clone\n",
+ "Using global pool for allocation \n"));
+ }
+
+ ACE_NEW_RETURN (qm,
+ TAO_Synch_Queued_Message (mb),
+ 0);
+ }
+
+ // Set the flag to indicate that <qm> is created on the heap.
+ if (qm)
+ qm->is_heap_created_ = 1;
+
+ return qm;
+}
+
void
TAO_Synch_Queued_Message::destroy (void)
{
+ if (this->is_heap_created_)
+ {
+ ACE_Message_Block::release (this->contents_);
+ this->current_block_ = 0;
+
+ // If we have an allocator release the memory to the allocator
+ // pool.
+ if (this->allocator_)
+ {
+ ACE_DES_FREE (this,
+ this->allocator_->free,
+ TAO_Synch_Queued_Message);
+
+ }
+ else // global release..
+ {
+ delete this;
+ }
+ }
}
diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h
index 2a8c638abf7..d9587fb10d9 100644
--- a/TAO/tao/Synch_Queued_Message.h
+++ b/TAO/tao/Synch_Queued_Message.h
@@ -47,8 +47,12 @@ public:
/// Constructor
/**
* @param contents The message block chain that must be sent.
+ *
+ * @param alloc The allocator that is used to allocate objects of
+ * this type.
*/
- TAO_Synch_Queued_Message (const ACE_Message_Block *contents);
+ TAO_Synch_Queued_Message (const ACE_Message_Block *contents,
+ ACE_Allocator *alloc = 0);
/// Destructor
virtual ~TAO_Synch_Queued_Message (void);
@@ -62,6 +66,7 @@ public:
virtual int all_data_sent (void) const;
virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const;
virtual void bytes_transferred (size_t &byte_count);
+ virtual TAO_Queued_Message *clone (ACE_Allocator *alloc);
virtual void destroy (void);
//@}
diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp
index 4853ab6b0cb..c4b8c4a7759 100644
--- a/TAO/tao/TAO_Server_Request.cpp
+++ b/TAO/tao/TAO_Server_Request.cpp
@@ -210,7 +210,9 @@ TAO_ServerRequest::send_no_exception_reply (void)
reply_params);
// Send the message.
- int result = this->transport_->send_message (*this->outgoing_);
+ int result = this->transport_->send_message (*this->outgoing_,
+ 0,
+ TAO_Transport::TAO_REPLY);
if (result == -1)
{
@@ -230,7 +232,9 @@ TAO_ServerRequest::send_no_exception_reply (void)
void
TAO_ServerRequest::tao_send_reply (void)
{
- int result = this->transport_->send_message (*this->outgoing_);
+ int result = this->transport_->send_message (*this->outgoing_,
+ 0,
+ TAO_Transport::TAO_REPLY);
if (result == -1)
{
if (TAO_debug_level > 0)
@@ -310,7 +314,9 @@ TAO_ServerRequest::tao_send_reply_exception (CORBA::Exception &ex)
}
// Send the message
- if (this->transport_->send_message (*this->outgoing_) == -1)
+ if (this->transport_->send_message (*this->outgoing_,
+ 0,
+ TAO_Transport::TAO_REPLY) == -1)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("TAO: (%P|%t|%N|%l): ")
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 335a0e75c9c..194072dc587 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -479,7 +479,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
int
TAO_Transport::send_message_shared (TAO_Stub *stub,
- int is_synchronous,
+ int message_semantics,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
@@ -490,7 +490,7 @@ TAO_Transport::send_message_shared (TAO_Stub *stub,
if (this->check_event_handler_i ("Transport::send_message_shared") == -1)
return -1;
- r = this->send_message_shared_i (stub, is_synchronous,
+ r = this->send_message_shared_i (stub, message_semantics,
message_block, max_wait_time);
}
if (r == -1)
@@ -511,34 +511,18 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
synch_message.push_back (this->head_, this->tail_);
- int n = this->drain_queue_i ();
- if (n == -1)
- {
- synch_message.remove_from_list (this->head_, this->tail_);
- ACE_ASSERT (synch_message.next () == 0);
- ACE_ASSERT (synch_message.prev () == 0);
- return -1; // Error while sending...
- }
- else if (n == 1)
- {
- ACE_ASSERT (synch_message.all_data_sent ());
- ACE_ASSERT (synch_message.next () == 0);
- ACE_ASSERT (synch_message.prev () == 0);
- return 1; // Empty queue, message was sent..
- }
+ int n =
+ this->send_synch_message_helper_i (synch_message,
+ max_wait_time);
- ACE_ASSERT (n == 0); // Some data sent, but data remains.
+ if (n == -1 ||
+ n == 1)
+ return n;
- if (synch_message.all_data_sent ())
- {
- ACE_ASSERT (synch_message.next () == 0);
- ACE_ASSERT (synch_message.prev () == 0);
- return 1;
- }
+ ACE_ASSERT (n == 0);
// @todo: Check for timeouts!
// if (max_wait_time != 0 && errno == ETIME) return -1;
-
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
(void) flushing_strategy->schedule_output (this);
@@ -608,6 +592,87 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
}
+int
+TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
+ ACE_Time_Value *max_wait_time)
+{
+ // Dont clone now.. We could be sent in one shot!
+ TAO_Synch_Queued_Message synch_message (mb);
+
+ synch_message.push_back (this->head_,
+ this->tail_);
+
+ int n =
+ this->send_synch_message_helper_i (synch_message,
+ max_wait_time);
+
+ if (n == -1 ||
+ n == 1)
+ return n;
+
+ ACE_ASSERT (n == 0);
+
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_reply_message_i, "
+ "preparing to add to queue before leaving \n",
+ this->id ()));
+ }
+
+ // Till this point we shouldnt have any copying and that is the
+ // point anyway. Now, remove the node from the list
+ synch_message.remove_from_list (this->head_,
+ this->tail_);
+
+ // Clone the node that we have.
+ TAO_Queued_Message *msg =
+ synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
+
+ // Stick it in the queue
+ msg->push_back (this->head_,
+ this->tail_);
+
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+
+ (void) flushing_strategy->schedule_output (this);
+
+ return 1;
+}
+
+int
+TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
+ ACE_Time_Value * /*max_wait_time*/)
+{
+ // @@todo: Need to send timeouts for writing..
+ int n = this->drain_queue_i ();
+ if (n == -1)
+ {
+ synch_message.remove_from_list (this->head_, this->tail_);
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
+ return -1; // Error while sending...
+ }
+ else if (n == 1)
+ {
+ ACE_ASSERT (synch_message.all_data_sent ());
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
+ return 1; // Empty queue, message was sent..
+ }
+
+ ACE_ASSERT (n == 0); // Some data sent, but data remains.
+
+ if (synch_message.all_data_sent ())
+ {
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
+ return 1;
+ }
+
+ return 0;
+}
void
@@ -659,11 +724,6 @@ TAO_Transport::close_connection_shared (int disable_purge,
this->send_connection_closed_notifications ();
}
-
-
-
-
-
int
TAO_Transport::queue_is_empty_i (void)
{
@@ -671,8 +731,6 @@ TAO_Transport::queue_is_empty_i (void)
}
-
-
int
TAO_Transport::schedule_output_i (void)
{
@@ -1041,15 +1099,21 @@ TAO_Transport::send_connection_closed_notifications (void)
int
TAO_Transport::send_message_shared_i (TAO_Stub *stub,
- int is_synchronous,
+ int message_semantics,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
- if (is_synchronous)
+ if (message_semantics == TAO_Transport::TAO_TWOWAY_REQUEST)
{
return this->send_synchronous_message_i (message_block,
max_wait_time);
}
+ else if (message_semantics == TAO_Transport::TAO_REPLY)
+ {
+ return this->send_reply_message_i (message_block,
+ max_wait_time);
+ }
+
// Let's figure out if the message should be queued without trying
// to send first:
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 4cfc9320f98..b06181adfbc 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -39,6 +39,7 @@ class TAO_Connection_Handler;
class TAO_Pluggable_Messaging;
class TAO_Queued_Message;
+class TAO_Synch_Queued_Message;
class TAO_Resume_Handle;
@@ -444,8 +445,6 @@ protected:
* will reduce footprint and simplify the process of implementing a
* pluggable protocol.
*/
- // @@ this is broken once we add the lock b/c it returns the thing
- // we're trying to lock down! (CJC)
virtual ACE_Event_Handler * event_handler_i (void) = 0;
virtual TAO_Connection_Handler * connection_handler_i (void) = 0;
@@ -558,6 +557,15 @@ public:
int block = 0);
+ enum
+ {
+ TAO_ONEWAY_REQUEST = 0,
+ TAO_TWOWAY_REQUEST = 1,
+ TAO_REPLY
+ };
+
+
+
/// Prepare the waiting and demuxing strategy to receive a reply for
/// a new request.
/**
@@ -587,7 +595,7 @@ public:
virtual int send_request (TAO_Stub *stub,
TAO_ORB_Core *orb_core,
TAO_OutputCDR &stream,
- int is_synchronous,
+ int message_semantics,
ACE_Time_Value *max_time_wait) = 0;
@@ -601,12 +609,33 @@ public:
* header can finally be set to the proper value.
*
*/
- // @@ lockme
virtual int send_message (TAO_OutputCDR &stream,
TAO_Stub *stub = 0,
- int is_synchronous = 1,
+ int message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
ACE_Time_Value *max_time_wait = 0) = 0;
+
+ /// Sent the contents of <message_block>
+ /**
+ * @param stub The object reference used for this operation, useful
+ * to obtain the current policies.
+ * @param message_semantics If this is set to TAO_TWO_REQUEST
+ * this method will block until the operation is completely
+ * written on the wire. If it is set to other values this
+ * operation could return.
+ * @param message_block The CDR encapsulation of the GIOP message
+ * that must be sent. The message may consist of
+ * multiple Message Blocks chained through the cont()
+ * field.
+ * @param max_wait_time The maximum time that the operation can
+ * block, used in the implementation of timeouts.
+ */
+ int send_message_shared (TAO_Stub *stub,
+ int message_semantics,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+
protected:
/// Register the handler with the reactor.
/**
@@ -618,7 +647,6 @@ protected:
* thread-per-connection mode. In that case putting the connection
* in the Reactor would produce unpredictable results anyway.
*/
- // @@ lockme
virtual int register_handler_i (void) = 0;
/// Called by the handle_input_i (). This method is used to parse
@@ -687,23 +715,7 @@ public:
size_t &bytes_transferred,
ACE_Time_Value *max_wait_time = 0);
- /// Sent the contents of <message_block>
- /**
- * @param stub The object reference used for this operation, useful
- * to obtain the current policies.
- * @param is_synchronous If set this method will block until the
- * operation is completely written on the wire
- * @param message_block The CDR encapsulation of the GIOP message
- * that must be sent. The message may consist of
- * multiple Message Blocks chained through the cont()
- * field.
- * @param max_wait_time The maximum time that the operation can
- * block, used in the implementation of timeouts.
- */
- int send_message_shared (TAO_Stub *stub,
- int is_synchronous,
- const ACE_Message_Block *message_block,
- ACE_Time_Value *max_wait_time);
+
/// Send a message block chain, assuming the lock is held
int send_message_block_chain_i (const ACE_Message_Block *message_block,
@@ -802,6 +814,17 @@ private:
int send_synchronous_message_i (const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time);
+ /// Send a reply message, i.e. do not block until the message is on
+ /// the wire, but just return after adding them to the queue.
+ int send_reply_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+ /// A helper method used by <send_synchronous_message_i> and
+ /// <send_reply_message_i>. Reusable code that could be used by both
+ /// the methods.
+ int send_synch_message_helper_i (TAO_Synch_Queued_Message &s,
+ ACE_Time_Value *max_wait_time);
+
/// Check if the flush timer is still pending
int flush_timer_pending (void) const;
@@ -841,7 +864,7 @@ private:
/// Implement send_message_shared() assuming the handler_lock_ is
/// held.
int send_message_shared_i (TAO_Stub *stub,
- int is_synchronous,
+ int message_semantics,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time);