From c0a0f2bd0a456f2734b187bd26f29812464908f2 Mon Sep 17 00:00:00 2001 From: bala Date: Fri, 12 Jul 2002 18:58:13 +0000 Subject: ChangeLogTag:Fri Jul 12 14:10:14 2002 Balachandran Natarajan --- TAO/tao/Asynch_Queued_Message.cpp | 6 +- TAO/tao/Asynch_Queued_Message.h | 3 +- TAO/tao/ChangeLog | 15 +++++ TAO/tao/IIOP_Transport.cpp | 4 +- TAO/tao/Invocation.cpp | 11 ++-- TAO/tao/Invocation.h | 4 +- TAO/tao/Queued_Message.h | 3 +- TAO/tao/Strategies/DIOP_Transport.cpp | 4 +- TAO/tao/Strategies/SHMIOP_Transport.cpp | 4 +- TAO/tao/Strategies/UIOP_Transport.cpp | 4 +- TAO/tao/Synch_Queued_Message.cpp | 6 +- TAO/tao/Synch_Queued_Message.h | 7 +- TAO/tao/Transport.cpp | 109 +++++++++++++++++++++----------- TAO/tao/Transport.h | 71 ++++++++++++++------- 14 files changed, 169 insertions(+), 82 deletions(-) create mode 100644 TAO/tao/ChangeLog diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp index 736a31208fd..3b87bbaf445 100644 --- a/TAO/tao/Asynch_Queued_Message.cpp +++ b/TAO/tao/Asynch_Queued_Message.cpp @@ -9,8 +9,10 @@ ACE_RCSID (tao, TAO_Asynch_Queued_Message:: - TAO_Asynch_Queued_Message (const ACE_Message_Block *contents) - : offset_ (0) + TAO_Asynch_Queued_Message (const ACE_Message_Block *contents, + ACE_Allocator *alloc) + : TAO_Queued_Message (alloc) + , offset_ (0) { this->size_ = contents->total_length (); // @@ Use a pool for these guys!! diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h index dbb4ca38bb0..a2708691abd 100644 --- a/TAO/tao/Asynch_Queued_Message.h +++ b/TAO/tao/Asynch_Queued_Message.h @@ -39,7 +39,8 @@ public: * need to hear when the connection timeouts or closes, but * cannot block waiting for the message to be delivered. */ - TAO_Asynch_Queued_Message (const ACE_Message_Block *contents); + TAO_Asynch_Queued_Message (const ACE_Message_Block *contents, + ACE_Allocator *alloc = 0); /// Destructor virtual ~TAO_Asynch_Queued_Message (void); diff --git a/TAO/tao/ChangeLog b/TAO/tao/ChangeLog new file mode 100644 index 00000000000..248048a0ea9 --- /dev/null +++ b/TAO/tao/ChangeLog @@ -0,0 +1,15 @@ +Fri Jul 12 14:10:14 2002 Balachandran Natarajan + + * M Asynch_Queued_Message.cpp +M Asynch_Queued_Message.h +M IIOP_Transport.cpp +M Invocation.cpp +M Invocation.h +M Queued_Message.h +M Synch_Queued_Message.cpp +M Synch_Queued_Message.h +M Transport.cpp +M Transport.h +M Strategies/DIOP_Transport.cpp +M Strategies/SHMIOP_Transport.cpp +M Strategies/UIOP_Transport.cpp diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 9ba0eb66310..e63bea76275 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -177,7 +177,7 @@ TAO_IIOP_Transport::send_request (TAO_Stub *stub, int TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, TAO_Stub *stub, - int twoway, + int write_semantics, ACE_Time_Value *max_wait_time) { // Format the message in the stream first @@ -186,7 +186,7 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_shared (stub, - twoway, + write_semantics, stream.begin (), max_wait_time); diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index a41f8fd538e..2459e750f00 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -370,7 +370,7 @@ TAO_GIOP_Invocation::prepare_header (CORBA::Octet response_flags // Send request. int -TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous +TAO_GIOP_Invocation::invoke (CORBA::Boolean write_semantics ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { @@ -386,7 +386,7 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous this->transport_->send_request (this->stub_, this->orb_core_, this->out_stream_, - is_synchronous, + write_semantics, this->max_wait_time_); // @@ -589,7 +589,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request } // Just send the request, without trying to wait for the reply. - int retval = TAO_GIOP_Invocation::invoke (1 ACE_ENV_ARG_PARAMETER); + int retval = TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_TWOWAY_REQUEST + ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (retval); if (retval != TAO_INVOKE_OK) @@ -962,12 +963,12 @@ TAO_GIOP_Oneway_Invocation::invoke (ACE_ENV_SINGLE_ARG_DECL) || this->sync_scope_ == TAO::SYNC_EAGER_BUFFERING || this->sync_scope_ == TAO::SYNC_DELAYED_BUFFERING) { - return TAO_GIOP_Invocation::invoke (0 + return TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_ONEWAY_REQUEST ACE_ENV_ARG_PARAMETER); } if (this->sync_scope_ == Messaging::SYNC_WITH_TRANSPORT) { - return TAO_GIOP_Invocation::invoke (1 + return TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_TWOWAY_REQUEST ACE_ENV_ARG_PARAMETER); } diff --git a/TAO/tao/Invocation.h b/TAO/tao/Invocation.h index ef2ef23d30d..8a1f8658409 100644 --- a/TAO/tao/Invocation.h +++ b/TAO/tao/Invocation.h @@ -206,7 +206,7 @@ protected: * Returns TAO_INVOKE_RESTART if the write call failed and the * request must be re-attempted. * - * @param is_synchronous If set invoke() does not return until the + * @param write_semantics If set invoke() does not return until the * message is completely delivered to the underlying * transport mechanism, or an error is detected. * @@ -214,7 +214,7 @@ protected: * that the server closed the connection simply to release * resources. */ - int invoke (CORBA::Boolean is_synchronous + int invoke (CORBA::Boolean write_semantics ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)); diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index 591aeb01b11..a49bac11b5c 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -15,12 +15,13 @@ #include "ace/pre.h" #include "corbafwd.h" -#include "LF_Event.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "LF_Event.h" + class ACE_Message_Block; /** diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp index 0b9e65da5de..f8b68f6f5dc 100644 --- a/TAO/tao/Strategies/DIOP_Transport.cpp +++ b/TAO/tao/Strategies/DIOP_Transport.cpp @@ -267,7 +267,7 @@ TAO_DIOP_Transport::send_request (TAO_Stub *stub, int TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream, TAO_Stub *stub, - int twoway, + int write_semantics, ACE_Time_Value *max_wait_time) { // Format the message in the stream first @@ -280,7 +280,7 @@ TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_shared (stub, - twoway, + write_semantics, stream.begin (), max_wait_time); diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index a3e32520f27..1101ee436ce 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -240,7 +240,7 @@ TAO_SHMIOP_Transport::send_request (TAO_Stub *stub, int TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream, TAO_Stub *stub, - int twoway, + int write_semantics, ACE_Time_Value *max_wait_time) { // Format the message in the stream first @@ -253,7 +253,7 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_shared (stub, - twoway, + write_semantics, stream.begin (), max_wait_time); diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp index bdc417892dd..def3963d91f 100644 --- a/TAO/tao/Strategies/UIOP_Transport.cpp +++ b/TAO/tao/Strategies/UIOP_Transport.cpp @@ -170,7 +170,7 @@ TAO_UIOP_Transport::send_request (TAO_Stub *stub, int TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream, TAO_Stub *stub, - int twoway, + int write_semantics, ACE_Time_Value *max_wait_time) { // Format the message in the stream first @@ -183,7 +183,7 @@ TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream, // This guarantees to send all data (bytes) or return an error. ssize_t n = this->send_message_shared (stub, - twoway, + write_semantics, stream.begin (), max_wait_time); diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp index ce7bd413d8a..75aa7a8c7d5 100644 --- a/TAO/tao/Synch_Queued_Message.cpp +++ b/TAO/tao/Synch_Queued_Message.cpp @@ -8,8 +8,10 @@ ACE_RCSID (tao, TAO_Synch_Queued_Message:: - TAO_Synch_Queued_Message (const ACE_Message_Block *contents) - : contents_ (ACE_const_cast (ACE_Message_Block*,contents)) + TAO_Synch_Queued_Message (const ACE_Message_Block *contents, + ACE_Allocator *alloc) + : TAO_Queued_Message (alloc) + , contents_ (ACE_const_cast (ACE_Message_Block*,contents)) , current_block_ (contents_) { } diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h index 2a8c638abf7..b9f4c315971 100644 --- a/TAO/tao/Synch_Queued_Message.h +++ b/TAO/tao/Synch_Queued_Message.h @@ -47,8 +47,13 @@ public: /// Constructor /** * @param contents The message block chain that must be sent. + * + * @param alloc The allocator that is used to allocate objects of + * this type. */ - TAO_Synch_Queued_Message (const ACE_Message_Block *contents); + TAO_Synch_Queued_Message (const ACE_Message_Block *contents; + + /// Destructor virtual ~TAO_Synch_Queued_Message (void); diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 335a0e75c9c..eb9e8ea890a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -479,7 +479,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, int TAO_Transport::send_message_shared (TAO_Stub *stub, - int is_synchronous, + int write_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { @@ -490,7 +490,7 @@ TAO_Transport::send_message_shared (TAO_Stub *stub, if (this->check_event_handler_i ("Transport::send_message_shared") == -1) return -1; - r = this->send_message_shared_i (stub, is_synchronous, + r = this->send_message_shared_i (stub, write_semantics, message_block, max_wait_time); } if (r == -1) @@ -509,36 +509,18 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // the message block. TAO_Synch_Queued_Message synch_message (mb); - synch_message.push_back (this->head_, this->tail_); + int n = + this->send_synch_message_helper_i (synch_message, + max_wait_time); - int n = this->drain_queue_i (); - if (n == -1) - { - synch_message.remove_from_list (this->head_, this->tail_); - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return -1; // Error while sending... - } - else if (n == 1) - { - ACE_ASSERT (synch_message.all_data_sent ()); - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return 1; // Empty queue, message was sent.. - } + if (n == -1 || + n == 1) + return n; - ACE_ASSERT (n == 0); // Some data sent, but data remains. - - if (synch_message.all_data_sent ()) - { - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return 1; - } + ACE_ASSERT (n == 0); // @todo: Check for timeouts! // if (max_wait_time != 0 && errno == ETIME) return -1; - TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); (void) flushing_strategy->schedule_output (this); @@ -608,6 +590,68 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, } +int +TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, + ACE_Time_Value *max_wait_time) +{ + // Dont clone now.. We could be sent in one shot! + TAO_Synch_Queued_Message synch_message (mb); + + int n = + this->send_synch_message_helper_i (synch_message, + max_wait_time); + + if (n == -1 || + n == 1) + return n; + + ACE_ASSERT (n == 0); + + // Till this point we shouldnt have any copying and that is the + // point anyway. + // Now, remove the node from the list + + // Clone the node + + + + +} + +int +TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message, + ACE_Time_Value * /*max_wait_time*/) +{ + synch_message.push_back (this->head_, this->tail_); + + // @@todo: Need to send timeouts for writing.. + int n = this->drain_queue_i (); + if (n == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return -1; // Error while sending... + } + else if (n == 1) + { + ACE_ASSERT (synch_message.all_data_sent ()); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; // Empty queue, message was sent.. + } + + ACE_ASSERT (n == 0); // Some data sent, but data remains. + + if (synch_message.all_data_sent ()) + { + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; + } + + return 0; +} void @@ -659,11 +703,6 @@ TAO_Transport::close_connection_shared (int disable_purge, this->send_connection_closed_notifications (); } - - - - - int TAO_Transport::queue_is_empty_i (void) { @@ -671,8 +710,6 @@ TAO_Transport::queue_is_empty_i (void) } - - int TAO_Transport::schedule_output_i (void) { @@ -1041,11 +1078,11 @@ TAO_Transport::send_connection_closed_notifications (void) int TAO_Transport::send_message_shared_i (TAO_Stub *stub, - int is_synchronous, + int write_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { - if (is_synchronous) + if (write_semantics == TAO_Transport::TAO_TWOWAY_REQUEST) { return this->send_synchronous_message_i (message_block, max_wait_time); diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 4cfc9320f98..5f08cbe7326 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -39,6 +39,7 @@ class TAO_Connection_Handler; class TAO_Pluggable_Messaging; class TAO_Queued_Message; +class TAO_Synch_Queued_Message; class TAO_Resume_Handle; @@ -444,8 +445,6 @@ protected: * will reduce footprint and simplify the process of implementing a * pluggable protocol. */ - // @@ this is broken once we add the lock b/c it returns the thing - // we're trying to lock down! (CJC) virtual ACE_Event_Handler * event_handler_i (void) = 0; virtual TAO_Connection_Handler * connection_handler_i (void) = 0; @@ -558,6 +557,15 @@ public: int block = 0); + enum + { + TAO_ONEWAY_REQUEST = 0, + TAO_TWOWAY_REQUEST = 1, + TAO_REPLY + }; + + + /// Prepare the waiting and demuxing strategy to receive a reply for /// a new request. /** @@ -587,7 +595,7 @@ public: virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int is_synchronous, + int write_semantics, ACE_Time_Value *max_time_wait) = 0; @@ -601,12 +609,33 @@ public: * header can finally be set to the proper value. * */ - // @@ lockme virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub = 0, - int is_synchronous = 1, + int write_semantics = TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait = 0) = 0; + + /// Sent the contents of + /** + * @param stub The object reference used for this operation, useful + * to obtain the current policies. + * @param write_semantics If this is set to TAO_TWO_REQUEST + * this method will block until the operation is completely + * written on the wire. If it is set to other values this + * operation could return. + * @param message_block The CDR encapsulation of the GIOP message + * that must be sent. The message may consist of + * multiple Message Blocks chained through the cont() + * field. + * @param max_wait_time The maximum time that the operation can + * block, used in the implementation of timeouts. + */ + int send_message_shared (TAO_Stub *stub, + int write_semantics, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); + + protected: /// Register the handler with the reactor. /** @@ -618,7 +647,6 @@ protected: * thread-per-connection mode. In that case putting the connection * in the Reactor would produce unpredictable results anyway. */ - // @@ lockme virtual int register_handler_i (void) = 0; /// Called by the handle_input_i (). This method is used to parse @@ -687,23 +715,7 @@ public: size_t &bytes_transferred, ACE_Time_Value *max_wait_time = 0); - /// Sent the contents of - /** - * @param stub The object reference used for this operation, useful - * to obtain the current policies. - * @param is_synchronous If set this method will block until the - * operation is completely written on the wire - * @param message_block The CDR encapsulation of the GIOP message - * that must be sent. The message may consist of - * multiple Message Blocks chained through the cont() - * field. - * @param max_wait_time The maximum time that the operation can - * block, used in the implementation of timeouts. - */ - int send_message_shared (TAO_Stub *stub, - int is_synchronous, - const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time); + /// Send a message block chain, assuming the lock is held int send_message_block_chain_i (const ACE_Message_Block *message_block, @@ -802,6 +814,17 @@ private: int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time); + /// Send a reply message, i.e. do not block until the message is on + /// the wire, but just return after adding them to the queue. + int send_reply_message_i (const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); + + /// A helper method used by and + /// . Reusable code that could be used by both + /// the methods. + int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, + ACE_Time_Value *max_wait_time); + /// Check if the flush timer is still pending int flush_timer_pending (void) const; @@ -841,7 +864,7 @@ private: /// Implement send_message_shared() assuming the handler_lock_ is /// held. int send_message_shared_i (TAO_Stub *stub, - int is_synchronous, + int write_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time); -- cgit v1.2.1