diff options
author | bala <balanatarajan@users.noreply.github.com> | 2002-07-28 14:11:49 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2002-07-28 14:11:49 +0000 |
commit | 19a3f5ae3b7c94320e0b954edaaf765596b3d78c (patch) | |
tree | 7eee82ee25e5339a7203ba58e3b89490817c91aa /TAO | |
parent | a18a408a1db9890a8a809e84f57248a71f220651 (diff) | |
download | ATCD-19a3f5ae3b7c94320e0b954edaaf765596b3d78c.tar.gz |
ChangeLogTag: Sun Jul 28 09:03:08 2002 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'TAO')
-rw-r--r-- | TAO/ChangeLog | 92 | ||||
-rw-r--r-- | TAO/tao/Asynch_Queued_Message.cpp | 95 | ||||
-rw-r--r-- | TAO/tao/Asynch_Queued_Message.h | 25 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 50 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 4 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Invocation.cpp | 11 | ||||
-rw-r--r-- | TAO/tao/Invocation.h | 4 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.h | 37 | ||||
-rw-r--r-- | TAO/tao/Strategies/DIOP_Transport.cpp | 4 | ||||
-rw-r--r-- | TAO/tao/Strategies/SHMIOP_Transport.cpp | 4 | ||||
-rw-r--r-- | TAO/tao/Strategies/UIOP_Transport.cpp | 4 | ||||
-rw-r--r-- | TAO/tao/Synch_Queued_Message.cpp | 73 | ||||
-rw-r--r-- | TAO/tao/Synch_Queued_Message.h | 7 | ||||
-rw-r--r-- | TAO/tao/TAO_Server_Request.cpp | 12 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 132 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 71 |
18 files changed, 529 insertions, 104 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index a80d3f846ee..9c147a073c3 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,95 @@ +Sun Jul 28 09:03:08 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + Merge from the branch bug_1125_stage_0. This merge solves the + dreaded problem of ORB building up the stack when a reply is + written. This should fix [BUG 1125]. + + Wed Jul 24 22:30:29 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Synch_Queued_Message.h: + * tao/Synch_Queued_Message.cpp: + * tao/Asynch_Queued_Message.h (TAO_Asynch_Queued_Message): + * tao/Asynch_Queued_Message.cpp: Added comments and spruced up + line spacings. + + * tao/Transport.cpp: Added a debug statement for logging + purposes. + + + Mon Jul 22 17:52:04 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/GIOP_Message_Base.cpp: We now use the global pool for + allocating memory for the return path. Using TSS created + problems when we were trying to move the unsent messages from + the TSS to global pool. The message block makes things to crash + horribly, especially at places that you have no clue about. + + * tao/Transport.cpp: Do not reset the allocators since it is not + required. + + Tue Jul 16 07:06:33 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Synch_Queued_Message.cpp: + * tao/Asynch_Queued_Message.cpp: Removed code that reset stuff + from the destructor. This caused problems when trying to use + the ACE_DES_FREE macro since it called the destructor first and + then free (). + + * tao/TAO_Server_Request.cpp: All calls to send_message () will + carry an argument mentioning that they are replies. + + * tao/GIOP_Message_Base.cpp: Fixed a problem in dump_msg () + method. The probelm was that we neved respected the byte-order + of the sending ORB when printing out request id's. Thanks to Kew + Whitney <Whitney.Kew@Invensys.com> + + * tao/Transport.cpp: Cosmetic fixes. + + Sun Jul 14 10:49:33 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Queued_Message.h: + * tao/Queued_Message.cpp: Added an argument in the constructor to + pass an allocator. Added a pure virtual method, clone () to the + interface. + + * tao/Synch_Queued_Message.h: + * tao/Synch_Queued_Message.cpp: + * tao/Asynch_Queued_Message.h: + * tao/Asynch_Queued_Message.cpp: Implemented the method clone (). + + * tao/Incoming_Mesage_Queue.cpp: Fixed a typo in a print + statement. + + Fri Jul 12 14:10:14 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Transport.h: + * tao/Transport.cpp: + - Added a new enum enumerating the different message types + recognised within the ORB. + + - Added two new methods send_reply_message_i () and + send_synch_message_helper_i (). The method + send_reply_message_i () seperates the path of the reply + message from the send_synchronous_message_i () method. The + method send_synch_message_helper_i () is a helper method + containing common code for the request and reply paths. + + - The methods, send_message (), send_message_shared () and + send_message_shared_i () had a variable named + is_synchronous. The variable name has been changed to reflect + the right usage. + + * tao/IIOP_Transport.cpp: + * tao/Strategies/DIOP_Transport.cpp: + * tao/Strategies/SHMIOP_Transport.cpp: + * tao/Strategies/UIOP_Transport.cpp: The variable names + is_synchronous was changed to reflect the usage. + + * tao/Invocation.cpp: + * tao/Invocation.h: Instead of calling invoke () with a magic + number, used the enumeration defined in Transport.h. + + Sun Jul 28 08:13:59 2002 Balachandran Natarajan <bala@cs.wustl.edu> * orbsvcs/tests/Miop/McastHello/Makefile: Updated dependencies. diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp index 736a31208fd..6fca0aa65dd 100644 --- a/TAO/tao/Asynch_Queued_Message.cpp +++ b/TAO/tao/Asynch_Queued_Message.cpp @@ -1,5 +1,6 @@ #include "Asynch_Queued_Message.h" - +#include "debug.h" +#include "ace/Malloc_T.h" #include "ace/Log_Msg.h" @@ -9,8 +10,10 @@ ACE_RCSID (tao, TAO_Asynch_Queued_Message:: - TAO_Asynch_Queued_Message (const ACE_Message_Block *contents) - : offset_ (0) + TAO_Asynch_Queued_Message (const ACE_Message_Block *contents, + ACE_Allocator *alloc) + : TAO_Queued_Message (alloc) + , offset_ (0) { this->size_ = contents->total_length (); // @@ Use a pool for these guys!! @@ -28,6 +31,16 @@ TAO_Asynch_Queued_Message:: } } +TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf, + size_t size, + ACE_Allocator *alloc) + : TAO_Queued_Message (alloc) + , size_ (size) + , offset_ (0) + , buffer_ (buf) +{ +} + TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message (void) { // @@ Use a pool for these guys! @@ -78,9 +91,81 @@ TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count) this->state_changed (TAO_LF_Event::LFS_SUCCESS); } + +TAO_Queued_Message * +TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc) +{ + char *buf = 0; + + // @@todo: Need to use a memory pool. But certain things need to + // change a bit in this class for that. Till then. + + // Just allocate and copy data that needs to be sent, no point + // copying the whole buffer. + size_t sz = this->size_ - this->offset_; + + ACE_NEW_RETURN (buf, + char[sz], + 0); + + ACE_OS::memcpy (buf, + this->buffer_ + this->offset_, + sz); + + TAO_Asynch_Queued_Message *qm = 0; + + if (alloc) + { + ACE_NEW_MALLOC_RETURN (qm, + ACE_static_cast (TAO_Asynch_Queued_Message *, + alloc->malloc (sizeof (TAO_Asynch_Queued_Message))), + TAO_Asynch_Queued_Message (buf, + sz, + alloc), + 0); + } + else + { + // No allocator, so use the common heap! + if (TAO_debug_level == 4) + { + // This debug is for testing purposes! + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Asynch_Queued_Message::clone\n", + "Using global pool for allocation \n")); + } + + ACE_NEW_RETURN (qm, + TAO_Asynch_Queued_Message (buf, + sz), + 0); + } + + // Set the flag to indicate that <qm> is created on the heap. + if (qm) + qm->is_heap_created_ = 1; + + return qm; +} + void TAO_Asynch_Queued_Message::destroy (void) { - // @@ Maybe it comes from a pool, we should do something about it. - delete this; + if (this->is_heap_created_) + { + // If we have an allocator release the memory to the allocator + // pool. + if (this->allocator_) + { + ACE_DES_FREE (this, + this->allocator_->free, + TAO_Asynch_Queued_Message); + + } + else // global release.. + { + delete this; + } + } + } diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h index dbb4ca38bb0..02bd5e7e8d6 100644 --- a/TAO/tao/Asynch_Queued_Message.h +++ b/TAO/tao/Asynch_Queued_Message.h @@ -34,12 +34,16 @@ public: /** * @param contents The message block chain that must be sent. * + * @param alloc Allocator used for creating <this> object. + * * @todo I'm almost sure this class will require a callback * interface for AMIs sent with SYNC_NONE policy. Those guys * need to hear when the connection timeouts or closes, but * cannot block waiting for the message to be delivered. */ - TAO_Asynch_Queued_Message (const ACE_Message_Block *contents); + TAO_Asynch_Queued_Message (const ACE_Message_Block *contents, + ACE_Allocator *alloc = 0); + /// Destructor virtual ~TAO_Asynch_Queued_Message (void); @@ -51,9 +55,28 @@ public: virtual int all_data_sent (void) const; virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const; virtual void bytes_transferred (size_t &byte_count); + /// @@NOTE: No reason to belive why this would be called. But have + /// it here for the sake of uniformity. + virtual TAO_Queued_Message *clone (ACE_Allocator *alloc); virtual void destroy (void); //@} +protected: + /// Constructor + /** + * @param buf The buffer that needs to be sent on the wire. The + * buffer will be owned by this class. The buffer will be + * deleted when the destructor is called and hence the + * buffer should always come off the heap! + * + * @param size The size of the buffer <buf> that is being handed + * over. + * + * @param alloc Allocator used for creating <this> object. + */ + TAO_Asynch_Queued_Message (char *buf, + size_t size, + ACE_Allocator *alloc = 0); private: /// The number of bytes in the buffer size_t size_; diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 7e89fce4ba5..d0691e7a65f 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -584,12 +584,21 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, #endif /* ACE_HAS_PURIFY */ // Initialze an output CDR on the stack + // NOTE: Dont jump to a conclusion as to why we are using the + // inpout_cdr and hence the global pool here. These pools will move + // to the lanes anyway at some point of time. Further, it would have + // been awesome to have this in TSS. But for some reason the cloning + // that happens when the ORB gets flow controlled while writing a + // reply is messing things up. We crash horribly. Doing this adds a + // lock, we need to set things like this -- put stuff in TSS here + // and transfer to global memory when we get flow controlled. We + // need to work on the message block to get it right! TAO_OutputCDR output (repbuf, sizeof repbuf, TAO_ENCAP_BYTE_ORDER, - this->orb_core_->output_cdr_buffer_allocator (), - this->orb_core_->output_cdr_dblock_allocator (), - this->orb_core_->output_cdr_msgblock_allocator (), + this->orb_core_->input_cdr_buffer_allocator (), + this->orb_core_->input_cdr_dblock_allocator (), + this->orb_core_->input_cdr_msgblock_allocator (), this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (), qd->major_version_, qd->minor_version_, @@ -882,7 +891,9 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, return -1; } - int result = transport->send_message (output); + int result = transport->send_message (output, + 0, + TAO_Transport::TAO_REPLY); if (result == -1) { if (TAO_debug_level > 0) @@ -1154,7 +1165,9 @@ TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport, status_info); // Send the message - int result = transport->send_message (output); + int result = transport->send_message (output, + 0, + TAO_Transport::TAO_REPLY); // Print out message if there is an error if (result == -1) @@ -1387,7 +1400,9 @@ TAO_GIOP_Message_Base::send_reply_exception ( *x) == -1) return -1; - return transport->send_message (output); + return transport->send_message (output, + 0, + TAO_Transport::TAO_REPLY); } void @@ -1427,22 +1442,33 @@ TAO_GIOP_Message_Base::dump_msg (const char *label, // request/reply id. CORBA::ULong tmp = 0; CORBA::ULong *id = &tmp; + char *tmp_id = 0; if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST || ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY) { - // @@ Only works if ServiceContextList is empty.... if (minor < 2) { - id = ACE_reinterpret_cast (CORBA::ULong *, - (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4)); - + // @@ Only works if ServiceContextList is empty.... + tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4); } else { - id = ACE_reinterpret_cast (CORBA::ULong *, - (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN)); + tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN); } +#if !defined (ACE_DISABLE_SWAP_ON_READ) + if (byte_order == TAO_ENCAP_BYTE_ORDER) + { + id = ACE_reinterpret_cast (ACE_CDR::ULong*, tmp_id); + } + else + { + ACE_CDR::swap_4 (tmp_id, ACE_reinterpret_cast (char*,id)); + } +#else + id = ACE_reinterpret_cast(ACE_CDR::ULong*, tmp_id); +#endif /* ACE_DISABLE_SWAP_ON_READ */ + } // Print. diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 9ba0eb66310..2b8adae0107 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -177,7 +177,7 @@ TAO_IIOP_Transport::send_request (TAO_Stub *stub, int TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, TAO_Stub *stub, - int twoway, + int message_semantics, ACE_Time_Value *max_wait_time) { // Format the message in the stream first @@ -186,7 +186,7 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_shared (stub, - twoway, + message_semantics, stream.begin (), max_wait_time); diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp index 0331c30d572..ef589e39cb8 100644 --- a/TAO/tao/Incoming_Message_Queue.cpp +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -194,7 +194,7 @@ TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc) { // This debug is for testing purposes! ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Queued_Data[%d]::get_queued_data\n", + "TAO (%P|%t) - Queued_Data::get_queued_data\n", "Using global pool for allocation \n")); } diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index a41f8fd538e..2459e750f00 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -370,7 +370,7 @@ TAO_GIOP_Invocation::prepare_header (CORBA::Octet response_flags // Send request. int -TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous +TAO_GIOP_Invocation::invoke (CORBA::Boolean write_semantics ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { @@ -386,7 +386,7 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous this->transport_->send_request (this->stub_, this->orb_core_, this->out_stream_, - is_synchronous, + write_semantics, this->max_wait_time_); // @@ -589,7 +589,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request } // Just send the request, without trying to wait for the reply. - int retval = TAO_GIOP_Invocation::invoke (1 ACE_ENV_ARG_PARAMETER); + int retval = TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_TWOWAY_REQUEST + ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (retval); if (retval != TAO_INVOKE_OK) @@ -962,12 +963,12 @@ TAO_GIOP_Oneway_Invocation::invoke (ACE_ENV_SINGLE_ARG_DECL) || this->sync_scope_ == TAO::SYNC_EAGER_BUFFERING || this->sync_scope_ == TAO::SYNC_DELAYED_BUFFERING) { - return TAO_GIOP_Invocation::invoke (0 + return TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_ONEWAY_REQUEST ACE_ENV_ARG_PARAMETER); } if (this->sync_scope_ == Messaging::SYNC_WITH_TRANSPORT) { - return TAO_GIOP_Invocation::invoke (1 + return TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_TWOWAY_REQUEST ACE_ENV_ARG_PARAMETER); } diff --git a/TAO/tao/Invocation.h b/TAO/tao/Invocation.h index ef2ef23d30d..8a1f8658409 100644 --- a/TAO/tao/Invocation.h +++ b/TAO/tao/Invocation.h @@ -206,7 +206,7 @@ protected: * Returns TAO_INVOKE_RESTART if the write call failed and the * request must be re-attempted. * - * @param is_synchronous If set invoke() does not return until the + * @param write_semantics If set invoke() does not return until the * message is completely delivered to the underlying * transport mechanism, or an error is detected. * @@ -214,7 +214,7 @@ protected: * that the server closed the connection simply to release * resources. */ - int invoke (CORBA::Boolean is_synchronous + int invoke (CORBA::Boolean write_semantics ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)); diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp index 68d308b2eeb..dfb957674f4 100644 --- a/TAO/tao/Queued_Message.cpp +++ b/TAO/tao/Queued_Message.cpp @@ -9,8 +9,10 @@ ACE_RCSID(tao, Queued_Message, "$Id$") -TAO_Queued_Message::TAO_Queued_Message (void) - : next_ (0) +TAO_Queued_Message::TAO_Queued_Message (ACE_Allocator *alloc) + : allocator_ (alloc) + , is_heap_created_ (0) + , next_ (0) , prev_ (0) { } diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index 591aeb01b11..47a98735a4d 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -15,14 +15,15 @@ #include "ace/pre.h" #include "corbafwd.h" -#include "LF_Event.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -class ACE_Message_Block; +#include "LF_Event.h" +class ACE_Message_Block; +class ACE_Allocator; /** * @class TAO_Queued_Message * @@ -65,7 +66,7 @@ class TAO_Export TAO_Queued_Message : public TAO_LF_Event { public: /// Constructor - TAO_Queued_Message (void); + TAO_Queued_Message (ACE_Allocator *alloc = 0); /// Destructor virtual ~TAO_Queued_Message (void); @@ -142,7 +143,9 @@ public: * method should update this counter * @param iov The io vector */ - virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const = 0; + virtual void fill_iov (int iovcnt_max, + int &iovcnt, + iovec iov[]) const = 0; /// Update the internal state, data has been sent. /** @@ -160,6 +163,18 @@ public: */ virtual void bytes_transferred (size_t &byte_count) = 0; + /// Clone this element + /* + * Clone the element and return a pointer to the cloned element on + * the heap. + * + * @param allocator Use the allocator for creating the new element + * on the heap. Remember, that the allocator will + * not be used allocate the data contained in this + * element. + */ + virtual TAO_Queued_Message *clone (ACE_Allocator *allocator) = 0; + /// Reclaim resources /** * Reliable messages are allocated from the stack, thus they do not @@ -170,6 +185,20 @@ public: virtual void destroy (void) = 0; //@} +protected: + /* + * Allocator that was used to create <this> object on the heap. If the + * allocator is null then <this> is on stack. + */ + ACE_Allocator *allocator_; + + /* + * A flag that acts as a boolean to indicate whether <this> is on + * stack or heap. A non-zero value indicates that <this> was created + * on heap. + */ + int is_heap_created_; + private: /// Implement an intrusive double-linked list for the message queue TAO_Queued_Message *next_; diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp index 0b9e65da5de..bf1c77f8d7d 100644 --- a/TAO/tao/Strategies/DIOP_Transport.cpp +++ b/TAO/tao/Strategies/DIOP_Transport.cpp @@ -267,7 +267,7 @@ TAO_DIOP_Transport::send_request (TAO_Stub *stub, int TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream, TAO_Stub *stub, - int twoway, + int message_semantics, ACE_Time_Value *max_wait_time) { // Format the message in the stream first @@ -280,7 +280,7 @@ TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_shared (stub, - twoway, + message_semantics, stream.begin (), max_wait_time); diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index a3e32520f27..8d539d0b55c 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -240,7 +240,7 @@ TAO_SHMIOP_Transport::send_request (TAO_Stub *stub, int TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream, TAO_Stub *stub, - int twoway, + int message_semantics, ACE_Time_Value *max_wait_time) { // Format the message in the stream first @@ -253,7 +253,7 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_shared (stub, - twoway, + message_semantics, stream.begin (), max_wait_time); diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp index bdc417892dd..30bc815433c 100644 --- a/TAO/tao/Strategies/UIOP_Transport.cpp +++ b/TAO/tao/Strategies/UIOP_Transport.cpp @@ -170,7 +170,7 @@ TAO_UIOP_Transport::send_request (TAO_Stub *stub, int TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream, TAO_Stub *stub, - int twoway, + int message_semantics, ACE_Time_Value *max_wait_time) { // Format the message in the stream first @@ -183,7 +183,7 @@ TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_shared (stub, - twoway, + message_semantics, stream.begin (), max_wait_time); diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp index ce7bd413d8a..c6f62892147 100644 --- a/TAO/tao/Synch_Queued_Message.cpp +++ b/TAO/tao/Synch_Queued_Message.cpp @@ -1,4 +1,6 @@ #include "Synch_Queued_Message.h" +#include "debug.h" +#include "ace/Malloc_T.h" #include "ace/Log_Msg.h" @@ -8,14 +10,17 @@ ACE_RCSID (tao, TAO_Synch_Queued_Message:: - TAO_Synch_Queued_Message (const ACE_Message_Block *contents) - : contents_ (ACE_const_cast (ACE_Message_Block*,contents)) + TAO_Synch_Queued_Message (const ACE_Message_Block *contents, + ACE_Allocator *alloc) + : TAO_Queued_Message (alloc) + , contents_ (ACE_const_cast (ACE_Message_Block*,contents)) , current_block_ (contents_) { } TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void) { + } const ACE_Message_Block * @@ -92,7 +97,71 @@ TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count) this->state_changed (TAO_LF_Event::LFS_SUCCESS); } +TAO_Queued_Message * +TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc) +{ + TAO_Synch_Queued_Message *qm = 0; + + // Clone the message block. + // NOTE: We wantedly do the cloning from <current_block_> instead of + // starting from <contents_> since we dont want to clone blocks that + // have already been sent on the wire. Waste of memory and + // associated copying. + ACE_Message_Block *mb = + this->current_block_->clone (); + + if (alloc) + { + ACE_NEW_MALLOC_RETURN (qm, + ACE_static_cast (TAO_Synch_Queued_Message *, + alloc->malloc (sizeof (TAO_Synch_Queued_Message))), + TAO_Synch_Queued_Message (mb, + alloc), + 0); + } + else + { + // No allocator, so use the common heap! + if (TAO_debug_level == 4) + { + // This debug is for testing purposes! + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Synch_Queued_Message::clone\n", + "Using global pool for allocation \n")); + } + + ACE_NEW_RETURN (qm, + TAO_Synch_Queued_Message (mb), + 0); + } + + // Set the flag to indicate that <qm> is created on the heap. + if (qm) + qm->is_heap_created_ = 1; + + return qm; +} + void TAO_Synch_Queued_Message::destroy (void) { + if (this->is_heap_created_) + { + ACE_Message_Block::release (this->contents_); + this->current_block_ = 0; + + // If we have an allocator release the memory to the allocator + // pool. + if (this->allocator_) + { + ACE_DES_FREE (this, + this->allocator_->free, + TAO_Synch_Queued_Message); + + } + else // global release.. + { + delete this; + } + } } diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h index 2a8c638abf7..d9587fb10d9 100644 --- a/TAO/tao/Synch_Queued_Message.h +++ b/TAO/tao/Synch_Queued_Message.h @@ -47,8 +47,12 @@ public: /// Constructor /** * @param contents The message block chain that must be sent. + * + * @param alloc The allocator that is used to allocate objects of + * this type. */ - TAO_Synch_Queued_Message (const ACE_Message_Block *contents); + TAO_Synch_Queued_Message (const ACE_Message_Block *contents, + ACE_Allocator *alloc = 0); /// Destructor virtual ~TAO_Synch_Queued_Message (void); @@ -62,6 +66,7 @@ public: virtual int all_data_sent (void) const; virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const; virtual void bytes_transferred (size_t &byte_count); + virtual TAO_Queued_Message *clone (ACE_Allocator *alloc); virtual void destroy (void); //@} diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp index 4853ab6b0cb..c4b8c4a7759 100644 --- a/TAO/tao/TAO_Server_Request.cpp +++ b/TAO/tao/TAO_Server_Request.cpp @@ -210,7 +210,9 @@ TAO_ServerRequest::send_no_exception_reply (void) reply_params); // Send the message. - int result = this->transport_->send_message (*this->outgoing_); + int result = this->transport_->send_message (*this->outgoing_, + 0, + TAO_Transport::TAO_REPLY); if (result == -1) { @@ -230,7 +232,9 @@ TAO_ServerRequest::send_no_exception_reply (void) void TAO_ServerRequest::tao_send_reply (void) { - int result = this->transport_->send_message (*this->outgoing_); + int result = this->transport_->send_message (*this->outgoing_, + 0, + TAO_Transport::TAO_REPLY); if (result == -1) { if (TAO_debug_level > 0) @@ -310,7 +314,9 @@ TAO_ServerRequest::tao_send_reply_exception (CORBA::Exception &ex) } // Send the message - if (this->transport_->send_message (*this->outgoing_) == -1) + if (this->transport_->send_message (*this->outgoing_, + 0, + TAO_Transport::TAO_REPLY) == -1) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO: (%P|%t|%N|%l): ") diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 335a0e75c9c..194072dc587 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -479,7 +479,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, int TAO_Transport::send_message_shared (TAO_Stub *stub, - int is_synchronous, + int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { @@ -490,7 +490,7 @@ TAO_Transport::send_message_shared (TAO_Stub *stub, if (this->check_event_handler_i ("Transport::send_message_shared") == -1) return -1; - r = this->send_message_shared_i (stub, is_synchronous, + r = this->send_message_shared_i (stub, message_semantics, message_block, max_wait_time); } if (r == -1) @@ -511,34 +511,18 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, synch_message.push_back (this->head_, this->tail_); - int n = this->drain_queue_i (); - if (n == -1) - { - synch_message.remove_from_list (this->head_, this->tail_); - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return -1; // Error while sending... - } - else if (n == 1) - { - ACE_ASSERT (synch_message.all_data_sent ()); - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return 1; // Empty queue, message was sent.. - } + int n = + this->send_synch_message_helper_i (synch_message, + max_wait_time); - ACE_ASSERT (n == 0); // Some data sent, but data remains. + if (n == -1 || + n == 1) + return n; - if (synch_message.all_data_sent ()) - { - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return 1; - } + ACE_ASSERT (n == 0); // @todo: Check for timeouts! // if (max_wait_time != 0 && errno == ETIME) return -1; - TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); (void) flushing_strategy->schedule_output (this); @@ -608,6 +592,87 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, } +int +TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, + ACE_Time_Value *max_wait_time) +{ + // Dont clone now.. We could be sent in one shot! + TAO_Synch_Queued_Message synch_message (mb); + + synch_message.push_back (this->head_, + this->tail_); + + int n = + this->send_synch_message_helper_i (synch_message, + max_wait_time); + + if (n == -1 || + n == 1) + return n; + + ACE_ASSERT (n == 0); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_reply_message_i, " + "preparing to add to queue before leaving \n", + this->id ())); + } + + // Till this point we shouldnt have any copying and that is the + // point anyway. Now, remove the node from the list + synch_message.remove_from_list (this->head_, + this->tail_); + + // Clone the node that we have. + TAO_Queued_Message *msg = + synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); + + // Stick it in the queue + msg->push_back (this->head_, + this->tail_); + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + (void) flushing_strategy->schedule_output (this); + + return 1; +} + +int +TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message, + ACE_Time_Value * /*max_wait_time*/) +{ + // @@todo: Need to send timeouts for writing.. + int n = this->drain_queue_i (); + if (n == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return -1; // Error while sending... + } + else if (n == 1) + { + ACE_ASSERT (synch_message.all_data_sent ()); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; // Empty queue, message was sent.. + } + + ACE_ASSERT (n == 0); // Some data sent, but data remains. + + if (synch_message.all_data_sent ()) + { + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; + } + + return 0; +} void @@ -659,11 +724,6 @@ TAO_Transport::close_connection_shared (int disable_purge, this->send_connection_closed_notifications (); } - - - - - int TAO_Transport::queue_is_empty_i (void) { @@ -671,8 +731,6 @@ TAO_Transport::queue_is_empty_i (void) } - - int TAO_Transport::schedule_output_i (void) { @@ -1041,15 +1099,21 @@ TAO_Transport::send_connection_closed_notifications (void) int TAO_Transport::send_message_shared_i (TAO_Stub *stub, - int is_synchronous, + int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { - if (is_synchronous) + if (message_semantics == TAO_Transport::TAO_TWOWAY_REQUEST) { return this->send_synchronous_message_i (message_block, max_wait_time); } + else if (message_semantics == TAO_Transport::TAO_REPLY) + { + return this->send_reply_message_i (message_block, + max_wait_time); + } + // Let's figure out if the message should be queued without trying // to send first: diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 4cfc9320f98..b06181adfbc 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -39,6 +39,7 @@ class TAO_Connection_Handler; class TAO_Pluggable_Messaging; class TAO_Queued_Message; +class TAO_Synch_Queued_Message; class TAO_Resume_Handle; @@ -444,8 +445,6 @@ protected: * will reduce footprint and simplify the process of implementing a * pluggable protocol. */ - // @@ this is broken once we add the lock b/c it returns the thing - // we're trying to lock down! (CJC) virtual ACE_Event_Handler * event_handler_i (void) = 0; virtual TAO_Connection_Handler * connection_handler_i (void) = 0; @@ -558,6 +557,15 @@ public: int block = 0); + enum + { + TAO_ONEWAY_REQUEST = 0, + TAO_TWOWAY_REQUEST = 1, + TAO_REPLY + }; + + + /// Prepare the waiting and demuxing strategy to receive a reply for /// a new request. /** @@ -587,7 +595,7 @@ public: virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int is_synchronous, + int message_semantics, ACE_Time_Value *max_time_wait) = 0; @@ -601,12 +609,33 @@ public: * header can finally be set to the proper value. * */ - // @@ lockme virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub = 0, - int is_synchronous = 1, + int message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait = 0) = 0; + + /// Sent the contents of <message_block> + /** + * @param stub The object reference used for this operation, useful + * to obtain the current policies. + * @param message_semantics If this is set to TAO_TWO_REQUEST + * this method will block until the operation is completely + * written on the wire. If it is set to other values this + * operation could return. + * @param message_block The CDR encapsulation of the GIOP message + * that must be sent. The message may consist of + * multiple Message Blocks chained through the cont() + * field. + * @param max_wait_time The maximum time that the operation can + * block, used in the implementation of timeouts. + */ + int send_message_shared (TAO_Stub *stub, + int message_semantics, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); + + protected: /// Register the handler with the reactor. /** @@ -618,7 +647,6 @@ protected: * thread-per-connection mode. In that case putting the connection * in the Reactor would produce unpredictable results anyway. */ - // @@ lockme virtual int register_handler_i (void) = 0; /// Called by the handle_input_i (). This method is used to parse @@ -687,23 +715,7 @@ public: size_t &bytes_transferred, ACE_Time_Value *max_wait_time = 0); - /// Sent the contents of <message_block> - /** - * @param stub The object reference used for this operation, useful - * to obtain the current policies. - * @param is_synchronous If set this method will block until the - * operation is completely written on the wire - * @param message_block The CDR encapsulation of the GIOP message - * that must be sent. The message may consist of - * multiple Message Blocks chained through the cont() - * field. - * @param max_wait_time The maximum time that the operation can - * block, used in the implementation of timeouts. - */ - int send_message_shared (TAO_Stub *stub, - int is_synchronous, - const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time); + /// Send a message block chain, assuming the lock is held int send_message_block_chain_i (const ACE_Message_Block *message_block, @@ -802,6 +814,17 @@ private: int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time); + /// Send a reply message, i.e. do not block until the message is on + /// the wire, but just return after adding them to the queue. + int send_reply_message_i (const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); + + /// A helper method used by <send_synchronous_message_i> and + /// <send_reply_message_i>. Reusable code that could be used by both + /// the methods. + int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, + ACE_Time_Value *max_wait_time); + /// Check if the flush timer is still pending int flush_timer_pending (void) const; @@ -841,7 +864,7 @@ private: /// Implement send_message_shared() assuming the handler_lock_ is /// held. int send_message_shared_i (TAO_Stub *stub, - int is_synchronous, + int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time); |