From 1409b44bb54c0845485dcfedfcbea3d548e50900 Mon Sep 17 00:00:00 2001 From: coryan Date: Sun, 8 Apr 2001 04:15:46 +0000 Subject: ChangeLogTag:Sat Apr 7 21:13:48 2001 Carlos O'Ryan --- TAO/ChangeLogs/ChangeLog-02a | 58 +++++-- TAO/tao/GIOP_Message_Handler.cpp | 2 +- TAO/tao/Invocation.cpp | 31 ++-- TAO/tao/Invocation.h | 8 +- TAO/tao/Transport.cpp | 318 +++++++++++++++++++++----------------- TAO/tao/Transport.h | 37 ++++- TAO/tests/Big_Oneways/Session.cpp | 6 + 7 files changed, 275 insertions(+), 185 deletions(-) diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 8a00270af23..92f44ec032a 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,29 +1,59 @@ +Sat Apr 7 21:13:48 2001 Carlos O'Ryan + + * tao/Transport.cpp: + Separate the path for synchronous and asynchronous requests more + cleanly. + Merge the close_connection() changes from the main trunk, the + ORB was dead-locking on me. + Improve output for sent iovectors, now it is only generated if + TAO_debug_level==2 and the Log_Msg is locked to prevent other + threads from dumping the same messages. + + * tao/Transport.cpp (drain_queue_i): + New method used by the synchronous path to send data ASAP. + + * tao/Invocation.h: + * tao/Invocation.cpp (invoke): + Clarify the semantics of the argument, actually it + means that the ORB should wait until the data is delivered to + the wire. + Oneway invocations with the SYNC_WITH_TRANSPORT policy should + block until the data is delivered to the wire. + + * tao/GIOP_Message_Handler.cpp: + Only print the full contents of the received data when the debug + level is *exactly* 2. + + * tests/Big_Oneways/Session.cpp: + Fill up the messages with a repeating pattern, this is useful + during debugging. + Thu Apr 05 10:36:57 2001 Carlos O'Ryan * tao/Queued_Message.h: - Remove the declaration of the done() method, the semantics were - not clear and it was removed a couple of iterations ago. + Remove the declaration of the done() method, the semantics were + not clear and it was removed a couple of iterations ago. * tao/Block_Flushing_Strategy.cpp: * tao/Reactive_Flushing_Strategy.cpp: - Wait until all_data_sent() returns. + Wait until all_data_sent() returns. * tao/Transport.cpp: - Cleanup handle_output() no need to loop, the drain_queue() - method does that. - After trying to send a message and blocking the send_message_i() - method was not updating the new Asynch_Queued_Message with the - number of bytes sent. - drain_queue() loop was too complicated for its own sake. - - * tests/Big_Oneways/Session.h: + Cleanup handle_output() no need to loop, the drain_queue() + method does that. + After trying to send a message and blocking the send_message_i() + method was not updating the new Asynch_Queued_Message with the + number of bytes sent. + drain_queue() loop was too complicated for its own sake. + + * tests/Big_Oneways/Session.h: * tests/Big_Oneways/Test.idl: * tests/Big_Oneways/Session.cpp: - Add methods to prime the connections among multiple clients. + Add methods to prime the connections among multiple clients. * tests/Big_Oneways/server.cpp: - Increase timeout for initial session registration. Important - for manual executions. + Increase timeout for initial session registration. Important + for manual executions. Wed Apr 4 10:53:27 2001 Carlos O'Ryan diff --git a/TAO/tao/GIOP_Message_Handler.cpp b/TAO/tao/GIOP_Message_Handler.cpp index c24fb4b9334..5df3551a03f 100644 --- a/TAO/tao/GIOP_Message_Handler.cpp +++ b/TAO/tao/GIOP_Message_Handler.cpp @@ -509,7 +509,7 @@ TAO_GIOP_Message_Handler::read_messages (TAO_Transport *transport) return -1; } - if (TAO_debug_level > 6) + if (TAO_debug_level == 2) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - GIOP_Message_Handler::read_messages" diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index 9e6058c32aa..2970dc187ae 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -387,7 +387,7 @@ TAO_GIOP_Invocation::prepare_header (CORBA::Octet response_flags, // Send request. int -TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip, +TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { @@ -399,18 +399,11 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip, TAO_INVOKE_EXCEPTION); } - // @@ Alex: the flag will be tricky when we move to - // AMI: now it is used both to indicate the the CORBA request in - // a twoway and that the send_request() operation should block. - // Even for oneways: with AMI it is possible to wait for a - // response (empty) for oneways, just to make sure that they - // arrive, there are policies to control that. - int result = this->transport_->send_request (this->stub_, this->orb_core_, this->out_stream_, - is_roundtrip, + is_synchronous, this->max_wait_time_); // @@ -618,8 +611,7 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request, } // Just send the request, without trying to wait for the reply. - int retval = TAO_GIOP_Invocation::invoke (1, - ACE_TRY_ENV); + int retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV); ACE_CHECK_RETURN (retval); if (retval != TAO_INVOKE_OK) @@ -855,8 +847,7 @@ TAO_GIOP_Twoway_Invocation::invoke (TAO_Exception_Data *excepts, { TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_INVOCATION_INVOKE_START); - int retval = this->invoke_i (0, - ACE_TRY_ENV); + int retval = this->invoke_i (0, ACE_TRY_ENV); ACE_CHECK_RETURN (retval); // A TAO_INVOKE_EXCEPTION status, but no exception raised means that @@ -973,18 +964,15 @@ int TAO_GIOP_Oneway_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - if (this->sync_scope_ == TAO::SYNC_WITH_TRANSPORT - || this->sync_scope_ == TAO::SYNC_NONE + if (this->sync_scope_ == TAO::SYNC_NONE || this->sync_scope_ == TAO::SYNC_EAGER_BUFFERING || this->sync_scope_ == TAO::SYNC_DELAYED_BUFFERING) { - return TAO_GIOP_Invocation::invoke (0, - ACE_TRY_ENV); + return TAO_GIOP_Invocation::invoke (0, ACE_TRY_ENV); } - int retval = this->invoke_i (0, - ACE_TRY_ENV); - ACE_CHECK_RETURN (retval); + int retval = this->invoke_i (0, ACE_TRY_ENV); + ACE_CHECK_RETURN (TAO_INVOKE_EXCEPTION); // A TAO_INVOKE_EXCEPTION status, but no exception raised means that // we have a user exception. @@ -1042,8 +1030,7 @@ TAO_GIOP_Locate_Request_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV) TAO_INVOKE_EXCEPTION); } - CORBA::ULong locate_status = this->invoke_i (1, - ACE_TRY_ENV); + CORBA::ULong locate_status = this->invoke_i (1, ACE_TRY_ENV); ACE_CHECK_RETURN (TAO_INVOKE_EXCEPTION); switch (locate_status) diff --git a/TAO/tao/Invocation.h b/TAO/tao/Invocation.h index 3296c919009..a031975d643 100644 --- a/TAO/tao/Invocation.h +++ b/TAO/tao/Invocation.h @@ -186,12 +186,16 @@ protected: /** * Returns TAO_INVOKE_RESTART if the write call failed and the * request must be re-attempted. - * @par + * + * @param is_synchronous If set invoke() does not return until the + * message is completely delivered to the underlying + * transport mechanism, or an error is detected. + * * Notice that the same profile is tried again because it may be * that the server closed the connection simply to release * resources. */ - int invoke (CORBA::Boolean is_roundtrip, + int invoke (CORBA::Boolean is_synchronous, CORBA_Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)); diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 814bd170f97..68893083f9a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -155,12 +155,16 @@ TAO_Transport::provide_handle (ACE_Handle_Set &handle_set) } static void -dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer) +dump_iov (iovec *iov, int iovcnt, int id, + size_t current_transfer, + const char *location) { + ACE_Log_Msg::instance ()->acquire (); + ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_block_chain" + "TAO (%P|%t) - Transport[%d]::%s" " sending %d buffers\n", - id, iovcnt)); + id, location, iovcnt)); for (int i = 0; i != iovcnt && 0 < current_transfer; ++i) { size_t iov_len = iov[i].iov_len; @@ -170,28 +174,36 @@ dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer) iov_len = current_transfer; ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_block_chain" + "TAO (%P|%t) - Transport[%d]::%s" " buffer %d/%d has %d bytes\n", - id, + id, location, i, iovcnt, iov_len)); + size_t len; for (size_t offset = 0; offset < iov_len; offset += len) { + char header[1024]; + ACE_OS::sprintf (header, + "TAO - Transport[%d]::%s (%d/%d)\n", + id, location, offset, iov_len); + len = iov_len - offset; if (len > 512) len = 512; ACE_HEX_DUMP ((LM_DEBUG, ACE_static_cast(char*,iov[i].iov_base) + offset, len, - "TAO (%P|%t) - Transport::send_message_block_chain ")); + header)); } current_transfer -= iov_len; } ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_mesage_block_chain" + "TAO (%P|%t) - Transport[%d]::%s" " end of data\n", - id)); + id, location)); + + ACE_Log_Msg::instance ()->release (); } int @@ -230,9 +242,10 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, ssize_t result = this->send (iov, iovcnt, current_transfer, timeout); - if (TAO_debug_level > 6) + if (TAO_debug_level == 2) { - dump_iov (iov, iovcnt, this->id (), current_transfer); + dump_iov (iov, iovcnt, this->id (), + current_transfer, "send_message_block_chain"); } // Add to total bytes transferred. @@ -260,9 +273,10 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, ssize_t result = this->send (iov, iovcnt, current_transfer, timeout); - if (TAO_debug_level > 6) + if (TAO_debug_level == 2) { - dump_iov (iov, iovcnt, this->id (), current_transfer); + dump_iov (iov, iovcnt, this->id (), + current_transfer, "send_message_block_chain"); } // Add to total bytes transferred. @@ -279,162 +293,155 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block, int TAO_Transport::send_message_i (TAO_Stub *stub, - int twoway_flag, + int is_synchronous, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); - int queue_empty = (this->head_ == 0); + if (is_synchronous) + { + return this->send_synchronous_message_i (stub, + message_block, + max_wait_time); + } // Let's figure out if the message should be queued without trying // to send first: - int must_queue = 0; - if (this->head_ != 0) - must_queue = 1; - else if (!twoway_flag - && stub->sync_strategy ().must_queue (queue_empty)) - { - must_queue = 1; - } + int try_sending_first = 1; - if (must_queue) + int queue_empty = (this->head_ == 0); + + if (!queue_empty) + try_sending_first = 0; + else if (stub->sync_strategy ().must_queue (queue_empty)) + try_sending_first = 0; + + size_t byte_count = 0; + ssize_t n; + if (try_sending_first) { - // ... simply queue the message ... + // ... in this case we must try to send the message first ... if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_message_i, " - "message is queued\n", + "trying to send the message\n", this->id ())); } - TAO_Queued_Message *queued_message = 0; - ACE_NEW_RETURN (queued_message, - TAO_Asynch_Queued_Message (message_block), - -1); - queued_message->push_back (this->head_, this->tail_); + // @@ I don't think we want to hold the mutex here, however if + // we release it we need to recheck the status of the transport + // after we return... once I understand the final form for this + // code I will re-visit this decision + n = this->send_message_block_chain (message_block, + byte_count, + max_wait_time); + if (n == 0) + return -1; // EOF + else if (n == -1) + { + // ... if this is just an EWOULDBLOCK we must schedule the + // message for later, if it is ETIME we still have to send + // the complete message, because cutting off the message at + // this point will destroy the synchronization with the + // server ... + if (errno != EWOULDBLOCK && errno != ETIME) + { + return -1; + } + } - if (this->must_flush_queue_i (stub)) + // ... let's figure out if the complete message was sent ... + if (message_block->total_length () == byte_count) { - ace_mon.release (); - int result = flushing_strategy->flush_message (this, - this->tail_); - return result; + // Done, just return. Notice that there are no allocations + // or copies up to this point (though some fancy calling + // back and forth). + // This is the common case for the critical path, it should + // be fast. + return 0; } - return 0; } - // ... in this case we must try to send the message first ... + // ... either the message must be queued or we need to queue it + // because it was not completely sent out ... if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_message_i, " - "trying to send the message\n", + "message is queued\n", this->id ())); } - size_t byte_count; + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message (message_block), + -1); + queued_message->bytes_transferred (byte_count); + queued_message->push_back (this->head_, this->tail_); - // @@ I don't think we want to hold the mutex here, however if - // we release it we need to recheck the status of the transport - // after we return... once I understand the final form for this - // code I will re-visit this decision - ssize_t n = this->send_message_block_chain (message_block, - byte_count, - max_wait_time); - if (n == 0) - return -1; // EOF - else if (n == -1) + // ... if the queue is full we need to activate the output on the + // queue ... + if (this->must_flush_queue_i (stub)) { - // ... if this is just an EWOULDBLOCK we must schedule the - // message for later ... - if (errno != EWOULDBLOCK) - { - return -1; - } + this->orb_core ()->flushing_strategy ()->schedule_output (this); } - // ... let's figure out if the complete message was sent ... - if (message_block->total_length () == byte_count) - { - // Done, just return. Notice that there are no allocations - // or copies up to this point (though some fancy calling - // back and forth). - // This is the common case for the critical path, it should - // be fast. - return 0; - } + // ... in any case, check for timeouts and report them to the + // application ... + if (max_wait_time != 0 && n == -1 && errno == ETIME) + return -1; - // ... the message was only partially sent, schedule reactive - // output... - flushing_strategy->schedule_output (this); + return 0; +} - // ... and set it as the current message ... - if (twoway_flag) - { - // ... we are going to block, so there is no need to clone - // the message block... - // @@ It seems wasteful to allocate a TAO_Queued_Message in - // this case, but it is simpler to do it this way. - TAO_Synch_Queued_Message synch_message (message_block); - - synch_message.bytes_transferred (byte_count); - synch_message.push_back (this->head_, this->tail_); - - // Release the mutex, other threads may modify the queue as we - // block for a long time writing out data. - int result; - { - ace_mon.release (); - result = flushing_strategy->flush_message (this, - &synch_message); - - ace_mon.acquire (); - } - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - synch_message.destroy (); - return result; - } +int +TAO_Transport::send_synchronous_message_i (TAO_Stub *stub, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + // We are going to block, so there is no need to clone + // the message block. + TAO_Synch_Queued_Message synch_message (message_block); - TAO_Queued_Message *queued_message = 0; - ACE_NEW_RETURN (queued_message, - TAO_Asynch_Queued_Message (message_block), - -1); + synch_message.push_back (this->head_, this->tail_); - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - " queued anyway, %d bytes sent\n", - this->id (), - byte_count)); - } + int n = this->drain_queue_i (); + if (n == -1) + return -1; // Error while sending... + else if (n == 1) + return 1; // Empty queue, message was sent.. - // ... insert at the head of the queue, we can use push_back() - // because the queue is empty ... + ACE_ASSERT (n == 0); // Some data sent, but data remains. - queued_message->bytes_transferred (byte_count); - queued_message->push_back (this->head_, this->tail_); + if (synch_message.all_data_sent ()) + return 1; - // ... this is not a twoway. We must check if the buffering - // constraints have been reached, if so, then we should start - // flushing out data.... + // @todo: Check for timeouts! + // if (max_wait_time != 0 && errno == ETIME) return -1; - if (this->must_flush_queue_i (stub)) - { - ace_mon.release (); - int result = flushing_strategy->flush_message (this, - this->tail_); - return result; - } - return 0; + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + (void) flushing_strategy->schedule_output (this); + + // Release the mutex, other threads may modify the queue as we + // block for a long time writing out data. + int result; + { + typedef ACE_Reverse_Lock TAO_REVERSE_SYNCH_MUTEX; + TAO_REVERSE_SYNCH_MUTEX reverse (this->queue_mutex_); + + ACE_GUARD_RETURN (TAO_REVERSE_SYNCH_MUTEX, ace_mon, reverse, -1); + result = flushing_strategy->flush_message (this, + &synch_message); + + } + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return result; } int @@ -547,18 +554,29 @@ TAO_Transport::make_idle (void) void TAO_Transport::close_connection (void) { - ACE_MT (ACE_GUARD (ACE_Lock, - guard, - *this->handler_lock_)); + ACE_Event_Handler *eh = 0; + { + ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_)); - // Call handle close on the handler. - // The event handler is as common as we can get - ACE_Event_Handler *eh = this->event_handler_i (); - if (eh) - eh->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::ALL_EVENTS_MASK); + eh = this->event_handler_i (); + + this->transition_handler_state_i (); + + if (eh == 0) + return; + } + + // Close the underlying connection, it is enough to get an + // Event_Handler pointer to do this, so we can factor out the code + // in the base TAO_Transport class. + (void) eh->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::ALL_EVENTS_MASK); // Purge the entry + // @todo This is redundant, handle_close() eventually calls + // this->connection_handler_closing(), that performs the same + // work, for some reason they hold the mutex while they do + // that work though. this->orb_core_->transport_cache ().purge_entry (this->cache_map_entry_); for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) @@ -580,11 +598,14 @@ TAO_Transport::send (iovec *iov, int iovcnt, // if there's no associated event handler, then we act like a null transport if (this->event_handler_i () == 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ") - ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"), - this->id (), - this->tag_)); + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ") + ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"), + this->id (), + this->tag_)); + } errno = ENOENT; return -1; } @@ -716,6 +737,12 @@ TAO_Transport::drain_queue (void) { ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); + return this->drain_queue_i (); +} + +int +TAO_Transport::drain_queue_i (void) +{ if (this->head_ == 0) return 1; @@ -743,6 +770,12 @@ TAO_Transport::drain_queue (void) ssize_t retval = this->send (iov, iovcnt, byte_count); + if (TAO_debug_level == 2) + { + dump_iov (iov, iovcnt, this->id (), + byte_count, "drain_queue_i"); + } + // ... now we need to update the queue, removing elements // that have been sent, and updating the last element if it // was only partially sent ... @@ -775,6 +808,12 @@ TAO_Transport::drain_queue (void) ssize_t retval = this->send (iov, iovcnt, byte_count); + if (TAO_debug_level == 2) + { + dump_iov (iov, iovcnt, this->id (), + byte_count, "drain_queue_i"); + } + this->cleanup_queue (byte_count); iovcnt = 0; @@ -788,6 +827,7 @@ TAO_Transport::drain_queue (void) return 0; return -1; } + ACE_ASSERT (byte_count != 0); if (this->head_ == 0) return 1; diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 6e296a72fe5..e132d080ceb 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -507,7 +507,7 @@ public: virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int twoway, + int is_synchronous, ACE_Time_Value *max_time_wait) = 0; /// This is a request for the transport object to write a request @@ -536,7 +536,7 @@ public: // @@ lockme virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub = 0, - int twoway = 1, + int is_synchronous = 1, ACE_Time_Value *max_time_wait = 0) = 0; /// Callback to read incoming data @@ -639,15 +639,29 @@ public: size_t &bytes_transferred, ACE_Time_Value *max_wait_time = 0); - /// Sent the contents of , blocking if required by + /// Sent the contents of + /** + * @todo This method name sucks, but send_message() was already + * taken by other silly methods! + * + * @param stub The object reference used for this operation, useful + * to obtain the current policies. + * @param is_synchronous If set this method will block until the + * operation is completely written on the wire + * @param message_block The CDR encapsulation of the GIOP message + * that must be sent. The message may consist of + * multiple Message Blocks chained through the cont() + * field. + * @param max_wait_time The maximum time that the operation can + * block, used in the implementation of timeouts. + * + */ /// the twoway flag or by the current policies in the stub. int send_message_i (TAO_Stub *stub, - int twoway_flag, + int is_synchronous, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time); - // TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *& cache_map_entry (void); - /// Cache management void mark_invalid (void); @@ -661,7 +675,7 @@ public: int cancel_output (void); private: - /// Try to send the current message. + /// Send some of the data in the queue. /** * As the outgoing data is drained this method is invoked to send as * much of the current message as possible. @@ -671,6 +685,9 @@ private: */ int drain_queue (void); + /// Implement drain_queue() assuming the lock is held + int drain_queue_i (void); + /// Cleanup the queue. /** * Exactly bytes have been sent, the queue must be @@ -686,6 +703,12 @@ private: /// Check if the buffering constraints have been reached int must_flush_queue_i (TAO_Stub *stub); + /// Send a synchronous message, i.e. block until the message is on + /// the wire + int send_synchronous_message_i (TAO_Stub *stub, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); + /// Prohibited ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&)) diff --git a/TAO/tests/Big_Oneways/Session.cpp b/TAO/tests/Big_Oneways/Session.cpp index 409a01595b0..0ce9bd9f2ce 100644 --- a/TAO/tests/Big_Oneways/Session.cpp +++ b/TAO/tests/Big_Oneways/Session.cpp @@ -47,6 +47,11 @@ Session::svc (void) Test::Payload payload (this->payload_size_); payload.length (this->payload_size_); + for (CORBA::ULong j = 0; j != this->payload_size_; ++j) + { + payload[j] = j % 256; + } + // Get the number of peers just once. CORBA::ULong session_count = this->other_sessions_.length (); @@ -115,6 +120,7 @@ Session::validate_connections (CORBA::Environment &ACE_TRY_ENV) ACE_CATCHANY {} ACE_ENDTRY; } } + ACE_DEBUG ((LM_DEBUG, "(%P|%t) connections are ready\n")); } void -- cgit v1.2.1