diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-02-12 18:23:32 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-02-12 18:23:32 +0000 |
commit | 52a75283d2a03b8ac4be54000614c611685604c7 (patch) | |
tree | 4b82492c75fbb801070c482fdc1a759a51f3537b | |
parent | 71961c9812e2f01a9842b0109de5a988bf1aca25 (diff) | |
download | ATCD-52a75283d2a03b8ac4be54000614c611685604c7.tar.gz |
ChangeLogTag:Mon Feb 12 10:15:47 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 96 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 7 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 15 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.h | 7 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.cpp | 8 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 257 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 30 |
7 files changed, 179 insertions, 241 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 16d4123403c..a6d4ffad94d 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,19 +1,37 @@ +Mon Feb 12 10:15:47 2001 Carlos O'Ryan <coryan@uci.edu> + + * tao/Transport.h: + * tao/Transport.cpp: + Add new method to send a message block chain. + The template method to send data takes an iovec argument. + + * tao/IIOP_Transport.h: + * tao/IIOP_Transport.cpp: + Implement the iovec-based send() template method. + + * tao/Queued_Message.cpp: + The cleanup code was broken. + + * tao/GIOP_Message_Base.cpp: + Use the message block chain method to send short critical + messages. + Fri Feb 09 10:50:47 2001 Carlos O'Ryan <coryan@uci.edu> * tao/Queued_Message.h: * tao/Queued_Message.inl: * tao/Queued_Message.cpp: - Add new class to represent a queued message in the outgoing - path. This class has to keep more than just the message block, - it also takes care of timeouts and signaling any waiting thread - when the message is sent. + Add new class to represent a queued message in the outgoing + path. This class has to keep more than just the message block, + it also takes care of timeouts and signaling any waiting thread + when the message is sent. * tao/Message_Sent_Callback.h: * tao/Message_Sent_Callback.inl: * tao/Message_Sent_Callback.cpp: - Define interface to signal threads waiting for a message to be - sent out, e.g. twoway requests blocked waiting for a queued - message. + Define interface to signal threads waiting for a message to be + sent out, e.g. twoway requests blocked waiting for a queued + message. * tao/Flushing_Strategy.h: * tao/Flushing_Strategy.cpp: @@ -21,40 +39,40 @@ Fri Feb 09 10:50:47 2001 Carlos O'Ryan <coryan@uci.edu> * tao/Block_Flushing_Strategy.cpp: * tao/Reactive_Flushing_Strategy.h * tao/Reactive_Flushing_Strategy.cpp: - New classes to control how the outgoing data is flushed, - either by blocking on write() or by using the reactor. + New classes to control how the outgoing data is flushed, + either by blocking on write() or by using the reactor. - * tao/Resource_Factory.h: + * tao/Resource_Factory.h: * tao/default_resource.h: * tao/default_resource.cpp: - Add new methods to create the flushing strategy. + Add new methods to create the flushing strategy. * tao/ORB_Core.h: * tao/ORB_Core.i: * tao/ORB_Core.cpp: - Add accessor for the flushing strategy. Notice that the - strategy is stateless so a single instance (per-ORB) is needed. + Add accessor for the flushing strategy. Notice that the + strategy is stateless so a single instance (per-ORB) is needed. * tao/Sync_Strategies.h: * tao/Sync_Strategies.cpp: - The Sync_Strategies have been simplified. They are now - stateless, and they only need to answer a couple of questions - (1) should a message be buffered, (2) should the ORB flush a - queue. + The Sync_Strategies have been simplified. They are now + stateless, and they only need to answer a couple of questions + (1) should a message be buffered, (2) should the ORB flush a + queue. * performance-tests/Latency/st_client.cpp: - Destroy the ORB on shutdown. + Destroy the ORB on shutdown. * tao/TAO.dsp: * tao/TAO_Static.dsp: - Add the new files to the TAO project files. + Add the new files to the TAO project files. * tao/Transport.h: * tao/Transport.inl: * tao/Transport.cpp: - Move much of the functionality of sending and outgoing message - queue up to the base transport class. Remove a lot of code - deailing with the previous (blocking) queues. + Move much of the functionality of sending and outgoing message + queue up to the base transport class. Remove a lot of code + deailing with the previous (blocking) queues. * tao/IIOP_Transport.cpp: * tao/IIOP_Connection_Handler.cpp: @@ -64,18 +82,18 @@ Fri Feb 09 10:50:47 2001 Carlos O'Ryan <coryan@uci.edu> * tao/Strategies/UIOP_Connection_Handler.cpp: * orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.cpp: * orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp: - Change the transport and connection handlers to use the new - outgoing message queue, the flushing strategy and the changes to - the SyncStrategy. + Change the transport and connection handlers to use the new + outgoing message queue, the flushing strategy and the changes to + the SyncStrategy. * tao/GIOP_Message_Base.cpp: - Propagate a few interface changes. + Propagate a few interface changes. * tests/LongWrites/LongWrites.dsw: * tests/LongWrites/client.dsp: * tests/LongWrites/server.dsp: * tests/LongWrites/run_test.pl: - Got the test to compile (and run) under NT. + Got the test to compile (and run) under NT. Wed Feb 7 14:31:48 2001 Jeff Parsons <parsons@cs.wustl.edu> @@ -94,18 +112,18 @@ Wed Feb 7 14:31:48 2001 Jeff Parsons <parsons@cs.wustl.edu> Wed Feb 7 12:57:35 2001 Frank Hunleth <fhunleth@cs.wustl.edu> - * tao/Acceptor_Registry.cpp: - Fixed redundant call to strtok_r in open_i when a protocol is - specified without an endpoint. This was causing core dumps on - Linux platforms. Thanks to Christian Ewald - <christian.ewald@zuehlke.com> for reporting this. + * tao/Acceptor_Registry.cpp: + Fixed redundant call to strtok_r in open_i when a protocol is + specified without an endpoint. This was causing core dumps on + Linux platforms. Thanks to Christian Ewald + <christian.ewald@zuehlke.com> for reporting this. Tue Feb 6 22:39:06 2001 Frank Hunleth <fhunleth@cs.wustl.edu> - * tao/Connection_Cache_Manager.cpp: - Fixed core dump in close_i () that was being caused by - unbinding hash map entries while iterating over the - hash map. + * tao/Connection_Cache_Manager.cpp: + Fixed core dump in close_i () that was being caused by + unbinding hash map entries while iterating over the + hash map. Tue Feb 06 15:49:41 2001 Carlos O'Ryan <coryan@uci.edu> @@ -117,9 +135,9 @@ Tue Feb 06 15:49:41 2001 Carlos O'Ryan <coryan@uci.edu> Tue Feb 6 16:34:16 2001 Yamuna Krishnamurthy <yamuna@cs.wustl.edu> - * orbsvcs/tests/AVStreams/Multicast/ftp.cpp: - - Fixed compile warnings on SunCC42. + * orbsvcs/tests/AVStreams/Multicast/ftp.cpp: + + Fixed compile warnings on SunCC42. Tue Feb 6 10:20:02 2001 Jeff Parsons <parsons@cs.wustl.edu> diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index de84da9427f..ab0dd9677a9 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -876,7 +876,8 @@ TAO_GIOP_Message_Base::send_error (TAO_Transport *transport) ACE_Message_Block message_block(&data_block); message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - int result = transport->send (&message_block); + size_t bt; + int result = transport->send_message_block_chain (&message_block, bt); if (result == -1) { if (TAO_debug_level > 0) @@ -1001,7 +1002,9 @@ TAO_GIOP_Message_Base:: ACE_Message_Block message_block(&data_block); message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - if (transport->send (&message_block) == -1) + size_t bt; + int result = transport->send_message_block_chain (&message_block, bt); + if (result == -1) { if (TAO_orbdebug) ACE_ERROR ((LM_ERROR, diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 25de3704e1f..858ce766fee 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -86,13 +86,16 @@ TAO_IIOP_Transport::idle (void) } ssize_t -TAO_IIOP_Transport::send (const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time, - size_t *bytes_transferred) +TAO_IIOP_Transport::send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *max_wait_time) { - return ACE::send (this->handle (), - message_block, - bytes_transferred); + ssize_t retval = this->service_handler ()->peer ().sendv (iov, iovcnt, + max_wait_time); + if (retval > 0) + bytes_transferred = retval; + + return retval; } ssize_t diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index e28be5436a8..df20d162ab5 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -75,10 +75,9 @@ public: virtual int idle (void); /// Write the complete Message_Block chain to the connection. - virtual ssize_t send (const ACE_Message_Block *mblk, - const ACE_Time_Value *s = 0, - size_t *bytes_transferred = 0); - + virtual ssize_t send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout = 0); /// Read len bytes from into buf. virtual ssize_t recv (char *buf, diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp index f3d4a2ed8f8..d2613ca902b 100644 --- a/TAO/tao/Queued_Message.cpp +++ b/TAO/tao/Queued_Message.cpp @@ -21,7 +21,13 @@ TAO_Queued_Message::TAO_Queued_Message (ACE_Message_Block *contents, TAO_Queued_Message::~TAO_Queued_Message (void) { - ACE_Message_Block::release (this->contents_); + ACE_Message_Block *i = this->contents_; + while (i != 0) + { + ACE_Message_Block *cont = i->cont (); i->cont (0); + ACE_Message_Block::release (i); + i = cont; + } } void diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 94beb0d4f9b..d3ca90cee6f 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -51,169 +51,20 @@ TAO_Transport::~TAO_Transport (void) // delete this->buffering_queue_; - for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + TAO_Queued_Message *i = this->head_; + while (i != 0) { // @@ This is a good point to insert a flag to indicate that a // CloseConnection message was successfully received. i->connection_closed (); - i->destroy (); - } -} - -#if 0 -ssize_t -TAO_Transport::send_or_buffer (TAO_Stub *stub, - int two_way, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) -{ - if (stub == 0 || two_way) - return this->send (message_block, max_wait_time); + TAO_Queued_Message *tmp = i; + i = i->next (); - TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); - - return sync_strategy.send (*this, - *stub, - message_block, - max_wait_time); -} - -ssize_t -TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time) -{ - // Make sure we have a buffering queue and there are messages in it. - if (this->buffering_queue_ == 0 || - this->buffering_queue_->is_empty ()) - return 1; - - // Get the first message from the queue. - ACE_Message_Block *queued_message = 0; - ssize_t result = this->buffering_queue_->peek_dequeue_head (queued_message); - - // @@ What to do here on failures? - ACE_ASSERT (result != -1); - - // Actual network send. - size_t bytes_transferred = 0; - result = this->send (queued_message, - max_wait_time, - &bytes_transferred); - - // Cannot send completely: timed out. - if (result == -1 && - errno == ETIME) - { - if (bytes_transferred > 0) - { - // If successful in sending some of the data, reset the - // queue appropriately. - this->reset_queued_message (queued_message, - bytes_transferred); - - // Indicate some success. - return bytes_transferred; - } - - // Since we queue up the message, this is not an error. We can - // try next time around. - return 1; + tmp->destroy (); } - - // EOF or other errors. - if (result == -1 || - result == 0) - { - this->dequeue_all (); - return -1; - } - - // If successful in sending data, reset the queue appropriately. - this->reset_queued_message (queued_message, - bytes_transferred); - - // Everything was successfully delivered. - return result; -} - -void -TAO_Transport::reset_sent_message (ACE_Message_Block *message_block, - size_t bytes_delivered) -{ - this->reset_message (message_block, - bytes_delivered, - 0); -} - -void -TAO_Transport::reset_queued_message (ACE_Message_Block *message_block, - size_t bytes_delivered) -{ - this->reset_message (message_block, - bytes_delivered, - 1); } -void -TAO_Transport::reset_message (ACE_Message_Block *message_block, - size_t bytes_delivered, - int queued_message) -{ - while (bytes_delivered != 0) - { - // Our current message block chain. - ACE_Message_Block *current_message_block = message_block; - - int completely_delivered_current_message_block_chain = 0; - - while (current_message_block != 0 && - bytes_delivered != 0) - { - size_t current_message_block_length = - current_message_block->length (); - - int completely_delivered_current_message_block = - bytes_delivered >= current_message_block_length; - - size_t adjustment_size = - ACE_MIN (current_message_block_length, bytes_delivered); - - // Reset according to send size. - current_message_block->rd_ptr (adjustment_size); - - // If queued message, adjust the queue. - if (queued_message) - // Hand adjust <message_length>. - this->buffering_queue_->message_length ( - this->buffering_queue_->message_length () - adjustment_size); - - // Adjust <bytes_delivered>. - bytes_delivered -= adjustment_size; - - if (completely_delivered_current_message_block) - { - // Next message block in the continuation chain. - current_message_block = current_message_block->cont (); - - if (current_message_block == 0) - completely_delivered_current_message_block_chain = 1; - } - } - - if (completely_delivered_current_message_block_chain) - { - // Go to the next message block chain. - message_block = message_block->next (); - - // If queued message, adjust the queue. - if (queued_message) - // Release this <current_message_block>. - this->dequeue_head (); - } - } -} -#endif /* 0 */ - int TAO_Transport::handle_output () { @@ -252,16 +103,90 @@ TAO_Transport::handle_output () } int +TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, + size_t &bytes_transferred, + ACE_Time_Value *timeout) +{ + bytes_transferred = 0; + + iovec iov[IOV_MAX]; + int iovcnt = 0; + + while (message_block != 0) + { + size_t message_block_length = + message_block->length (); + + // Check if this block has any data to be sent. + if (message_block_length > 0) + { + // Collect the data in the iovec. + iov[iovcnt].iov_base = message_block->rd_ptr (); + iov[iovcnt].iov_len = message_block_length; + + // Increment iovec counter. + iovcnt++; + + // The buffer is full make a OS call. @@ TODO find a way to + // find IOV_MAX for platforms that do not define it rather + // than simply setting IOV_MAX to some arbitrary value such + // as 16. + if (iovcnt == IOV_MAX) + { + size_t current_transfer = 0; + + ssize_t result = + this->send (iov, iovcnt, current_transfer, timeout); + + + // Errors. + if (result == -1 || result == 0) + return result; + + // Add to total bytes transferred. + bytes_transferred += current_transfer; + + // Reset iovec counter. + iovcnt = 0; + } + } + + // Select the next message block in the chain. + message_block = message_block->cont (); + } + + // Check for remaining buffers to be sent. This will happen when + // IOV_MAX is not a multiple of the number of message blocks. + if (iovcnt != 0) + { + size_t current_transfer = 0; + + ssize_t result = + this->send (iov, iovcnt, current_transfer, timeout); + // Errors. + if (result == -1 || result == 0) + return result; + + // Add to total bytes transferred. + bytes_transferred += current_transfer; + } + + // Return total bytes transferred. + return bytes_transferred; +} + +int TAO_Transport::send_current_message (void) { if (this->current_message_ == 0) return 1; - size_t byte_count; - ssize_t n = this->send (this->current_message_->mb (), - 0, /* it is non-blocking */ - &byte_count); - if (n == 0) + size_t bytes_transferred; + + ssize_t retval = + this->send_message_block_chain (this->current_message_->mb (), + bytes_transferred); + if (retval == 0) { // The connection was closed, return -1 to have the Reactor // close this transport and event handler @@ -270,7 +195,7 @@ TAO_Transport::send_current_message (void) // Because there can be a partial transfer we need to adjust the // number of bytes sent. - this->current_message_->bytes_transferred (byte_count); + this->current_message_->bytes_transferred (bytes_transferred); if (this->current_message_->done ()) { // Remove the current message.... @@ -279,13 +204,13 @@ TAO_Transport::send_current_message (void) this->current_message_ = 0; - if (n == -1) + if (retval == -1) return -1; return 1; } - if (n == -1) + if (retval == -1) { // ... timeouts and flow control are not real errors, the // connection is still valid and we must continue sending the @@ -347,16 +272,16 @@ TAO_Transport::send_message_i (TAO_Stub *stub, if (non_queued_message) { // ... in this case we must try to send the message first ... - + size_t byte_count; // @@ I don't think we want to hold the mutex here, however if // we release it we need to recheck the status of the transport // after we return... once I understand the final form for this // code I will re-visit this stuff. - ssize_t n = this->send (message_block, - 0, // non-blocking - &byte_count); + ssize_t n = this->send_message_block_chain (message_block, + byte_count, + 0 /* non-blocking */); if (n == 0) return -1; else if (n == -1 && errno != EWOULDBLOCK) diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 1505925b4e9..04dfef8f626 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -324,9 +324,9 @@ public: * bytes already on the OS I/O subsystem. * */ - virtual ssize_t send (const ACE_Message_Block *mblk, - const ACE_Time_Value *timeout = 0, - size_t *bytes_transferred = 0) = 0; + virtual ssize_t send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout = 0) = 0; // Read len bytes from into buf. /** @@ -466,26 +466,10 @@ public: CORBA::Octet minor) = 0; //@} -protected: -#if 0 - /// Remove the first message from the outgoing queue. - void dequeue_head (void); - - /// Update the state of the outgoing queue, assuming that - /// bytes_delivered bytes have been sent already. - void reset_queued_message (ACE_Message_Block *message_block, - size_t bytes_delivered); - - /// Update the state of the outgoing queue, this time a complete - /// message was sent. - void reset_sent_message (ACE_Message_Block *message_block, - size_t bytes_delivered); - - /// Helper function used to implement the two methods above. - void reset_message (ACE_Message_Block *message_block, - size_t bytes_delivered, - int queued_message); -#endif /* 0 */ + /// Send a message block chain, + int send_message_block_chain (const ACE_Message_Block *message_block, + size_t &bytes_transferred, + ACE_Time_Value *max_wait_time = 0); protected: /// Sent the contents of <message_block>, blocking if required by |