summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-08 04:15:46 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-08 04:15:46 +0000
commit1409b44bb54c0845485dcfedfcbea3d548e50900 (patch)
tree53dff4d3630c16c87710da79a60f81666782440c
parent1dddc51e6ff3e372eabb661bda47956d7a1a0c5e (diff)
downloadATCD-1409b44bb54c0845485dcfedfcbea3d548e50900.tar.gz
ChangeLogTag:Sat Apr 7 21:13:48 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a58
-rw-r--r--TAO/tao/GIOP_Message_Handler.cpp2
-rw-r--r--TAO/tao/Invocation.cpp31
-rw-r--r--TAO/tao/Invocation.h8
-rw-r--r--TAO/tao/Transport.cpp318
-rw-r--r--TAO/tao/Transport.h37
-rw-r--r--TAO/tests/Big_Oneways/Session.cpp6
7 files changed, 275 insertions, 185 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 8a00270af23..92f44ec032a 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,29 +1,59 @@
+Sat Apr 7 21:13:48 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * tao/Transport.cpp:
+ Separate the path for synchronous and asynchronous requests more
+ cleanly.
+ Merge the close_connection() changes from the main trunk, the
+ ORB was dead-locking on me.
+ Improve output for sent iovectors, now it is only generated if
+ TAO_debug_level==2 and the Log_Msg is locked to prevent other
+ threads from dumping the same messages.
+
+ * tao/Transport.cpp (drain_queue_i):
+ New method used by the synchronous path to send data ASAP.
+
+ * tao/Invocation.h:
+ * tao/Invocation.cpp (invoke):
+ Clarify the semantics of the <twoway_flag> argument, actually it
+ means that the ORB should wait until the data is delivered to
+ the wire.
+ Oneway invocations with the SYNC_WITH_TRANSPORT policy should
+ block until the data is delivered to the wire.
+
+ * tao/GIOP_Message_Handler.cpp:
+ Only print the full contents of the received data when the debug
+ level is *exactly* 2.
+
+ * tests/Big_Oneways/Session.cpp:
+ Fill up the messages with a repeating pattern, this is useful
+ during debugging.
+
Thu Apr 05 10:36:57 2001 Carlos O'Ryan <coryan@uci.edu>
* tao/Queued_Message.h:
- Remove the declaration of the done() method, the semantics were
- not clear and it was removed a couple of iterations ago.
+ Remove the declaration of the done() method, the semantics were
+ not clear and it was removed a couple of iterations ago.
* tao/Block_Flushing_Strategy.cpp:
* tao/Reactive_Flushing_Strategy.cpp:
- Wait until all_data_sent() returns.
+ Wait until all_data_sent() returns.
* tao/Transport.cpp:
- Cleanup handle_output() no need to loop, the drain_queue()
- method does that.
- After trying to send a message and blocking the send_message_i()
- method was not updating the new Asynch_Queued_Message with the
- number of bytes sent.
- drain_queue() loop was too complicated for its own sake.
-
- * tests/Big_Oneways/Session.h:
+ Cleanup handle_output() no need to loop, the drain_queue()
+ method does that.
+ After trying to send a message and blocking the send_message_i()
+ method was not updating the new Asynch_Queued_Message with the
+ number of bytes sent.
+ drain_queue() loop was too complicated for its own sake.
+
+ * tests/Big_Oneways/Session.h:
* tests/Big_Oneways/Test.idl:
* tests/Big_Oneways/Session.cpp:
- Add methods to prime the connections among multiple clients.
+ Add methods to prime the connections among multiple clients.
* tests/Big_Oneways/server.cpp:
- Increase timeout for initial session registration. Important
- for manual executions.
+ Increase timeout for initial session registration. Important
+ for manual executions.
Wed Apr 4 10:53:27 2001 Carlos O'Ryan <coryan@uci.edu>
diff --git a/TAO/tao/GIOP_Message_Handler.cpp b/TAO/tao/GIOP_Message_Handler.cpp
index c24fb4b9334..5df3551a03f 100644
--- a/TAO/tao/GIOP_Message_Handler.cpp
+++ b/TAO/tao/GIOP_Message_Handler.cpp
@@ -509,7 +509,7 @@ TAO_GIOP_Message_Handler::read_messages (TAO_Transport *transport)
return -1;
}
- if (TAO_debug_level > 6)
+ if (TAO_debug_level == 2)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - GIOP_Message_Handler::read_messages"
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index 9e6058c32aa..2970dc187ae 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -387,7 +387,7 @@ TAO_GIOP_Invocation::prepare_header (CORBA::Octet response_flags,
// Send request.
int
-TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip,
+TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
@@ -399,18 +399,11 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip,
TAO_INVOKE_EXCEPTION);
}
- // @@ Alex: the <is_roundtrip> flag will be tricky when we move to
- // AMI: now it is used both to indicate the the CORBA request in
- // a twoway and that the send_request() operation should block.
- // Even for oneways: with AMI it is possible to wait for a
- // response (empty) for oneways, just to make sure that they
- // arrive, there are policies to control that.
-
int result =
this->transport_->send_request (this->stub_,
this->orb_core_,
this->out_stream_,
- is_roundtrip,
+ is_synchronous,
this->max_wait_time_);
//
@@ -618,8 +611,7 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request,
}
// Just send the request, without trying to wait for the reply.
- int retval = TAO_GIOP_Invocation::invoke (1,
- ACE_TRY_ENV);
+ int retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV);
ACE_CHECK_RETURN (retval);
if (retval != TAO_INVOKE_OK)
@@ -855,8 +847,7 @@ TAO_GIOP_Twoway_Invocation::invoke (TAO_Exception_Data *excepts,
{
TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_INVOCATION_INVOKE_START);
- int retval = this->invoke_i (0,
- ACE_TRY_ENV);
+ int retval = this->invoke_i (0, ACE_TRY_ENV);
ACE_CHECK_RETURN (retval);
// A TAO_INVOKE_EXCEPTION status, but no exception raised means that
@@ -973,18 +964,15 @@ int
TAO_GIOP_Oneway_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- if (this->sync_scope_ == TAO::SYNC_WITH_TRANSPORT
- || this->sync_scope_ == TAO::SYNC_NONE
+ if (this->sync_scope_ == TAO::SYNC_NONE
|| this->sync_scope_ == TAO::SYNC_EAGER_BUFFERING
|| this->sync_scope_ == TAO::SYNC_DELAYED_BUFFERING)
{
- return TAO_GIOP_Invocation::invoke (0,
- ACE_TRY_ENV);
+ return TAO_GIOP_Invocation::invoke (0, ACE_TRY_ENV);
}
- int retval = this->invoke_i (0,
- ACE_TRY_ENV);
- ACE_CHECK_RETURN (retval);
+ int retval = this->invoke_i (0, ACE_TRY_ENV);
+ ACE_CHECK_RETURN (TAO_INVOKE_EXCEPTION);
// A TAO_INVOKE_EXCEPTION status, but no exception raised means that
// we have a user exception.
@@ -1042,8 +1030,7 @@ TAO_GIOP_Locate_Request_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV)
TAO_INVOKE_EXCEPTION);
}
- CORBA::ULong locate_status = this->invoke_i (1,
- ACE_TRY_ENV);
+ CORBA::ULong locate_status = this->invoke_i (1, ACE_TRY_ENV);
ACE_CHECK_RETURN (TAO_INVOKE_EXCEPTION);
switch (locate_status)
diff --git a/TAO/tao/Invocation.h b/TAO/tao/Invocation.h
index 3296c919009..a031975d643 100644
--- a/TAO/tao/Invocation.h
+++ b/TAO/tao/Invocation.h
@@ -186,12 +186,16 @@ protected:
/**
* Returns TAO_INVOKE_RESTART if the write call failed and the
* request must be re-attempted.
- * @par
+ *
+ * @param is_synchronous If set invoke() does not return until the
+ * message is completely delivered to the underlying
+ * transport mechanism, or an error is detected.
+ *
* Notice that the same profile is tried again because it may be
* that the server closed the connection simply to release
* resources.
*/
- int invoke (CORBA::Boolean is_roundtrip,
+ int invoke (CORBA::Boolean is_synchronous,
CORBA_Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException));
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 814bd170f97..68893083f9a 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -155,12 +155,16 @@ TAO_Transport::provide_handle (ACE_Handle_Set &handle_set)
}
static void
-dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer)
+dump_iov (iovec *iov, int iovcnt, int id,
+ size_t current_transfer,
+ const char *location)
{
+ ACE_Log_Msg::instance ()->acquire ();
+
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_block_chain"
+ "TAO (%P|%t) - Transport[%d]::%s"
" sending %d buffers\n",
- id, iovcnt));
+ id, location, iovcnt));
for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
{
size_t iov_len = iov[i].iov_len;
@@ -170,28 +174,36 @@ dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer)
iov_len = current_transfer;
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_block_chain"
+ "TAO (%P|%t) - Transport[%d]::%s"
" buffer %d/%d has %d bytes\n",
- id,
+ id, location,
i, iovcnt,
iov_len));
+
size_t len;
for (size_t offset = 0; offset < iov_len; offset += len)
{
+ char header[1024];
+ ACE_OS::sprintf (header,
+ "TAO - Transport[%d]::%s (%d/%d)\n",
+ id, location, offset, iov_len);
+
len = iov_len - offset;
if (len > 512)
len = 512;
ACE_HEX_DUMP ((LM_DEBUG,
ACE_static_cast(char*,iov[i].iov_base) + offset,
len,
- "TAO (%P|%t) - Transport::send_message_block_chain "));
+ header));
}
current_transfer -= iov_len;
}
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_mesage_block_chain"
+ "TAO (%P|%t) - Transport[%d]::%s"
" end of data\n",
- id));
+ id, location));
+
+ ACE_Log_Msg::instance ()->release ();
}
int
@@ -230,9 +242,10 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
ssize_t result =
this->send (iov, iovcnt, current_transfer, timeout);
- if (TAO_debug_level > 6)
+ if (TAO_debug_level == 2)
{
- dump_iov (iov, iovcnt, this->id (), current_transfer);
+ dump_iov (iov, iovcnt, this->id (),
+ current_transfer, "send_message_block_chain");
}
// Add to total bytes transferred.
@@ -260,9 +273,10 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
ssize_t result =
this->send (iov, iovcnt, current_transfer, timeout);
- if (TAO_debug_level > 6)
+ if (TAO_debug_level == 2)
{
- dump_iov (iov, iovcnt, this->id (), current_transfer);
+ dump_iov (iov, iovcnt, this->id (),
+ current_transfer, "send_message_block_chain");
}
// Add to total bytes transferred.
@@ -279,162 +293,155 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
int
TAO_Transport::send_message_i (TAO_Stub *stub,
- int twoway_flag,
+ int is_synchronous,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
-
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
- int queue_empty = (this->head_ == 0);
+ if (is_synchronous)
+ {
+ return this->send_synchronous_message_i (stub,
+ message_block,
+ max_wait_time);
+ }
// Let's figure out if the message should be queued without trying
// to send first:
- int must_queue = 0;
- if (this->head_ != 0)
- must_queue = 1;
- else if (!twoway_flag
- && stub->sync_strategy ().must_queue (queue_empty))
- {
- must_queue = 1;
- }
+ int try_sending_first = 1;
- if (must_queue)
+ int queue_empty = (this->head_ == 0);
+
+ if (!queue_empty)
+ try_sending_first = 0;
+ else if (stub->sync_strategy ().must_queue (queue_empty))
+ try_sending_first = 0;
+
+ size_t byte_count = 0;
+ ssize_t n;
+ if (try_sending_first)
{
- // ... simply queue the message ...
+ // ... in this case we must try to send the message first ...
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::send_message_i, "
- "message is queued\n",
+ "trying to send the message\n",
this->id ()));
}
- TAO_Queued_Message *queued_message = 0;
- ACE_NEW_RETURN (queued_message,
- TAO_Asynch_Queued_Message (message_block),
- -1);
- queued_message->push_back (this->head_, this->tail_);
+ // @@ I don't think we want to hold the mutex here, however if
+ // we release it we need to recheck the status of the transport
+ // after we return... once I understand the final form for this
+ // code I will re-visit this decision
+ n = this->send_message_block_chain (message_block,
+ byte_count,
+ max_wait_time);
+ if (n == 0)
+ return -1; // EOF
+ else if (n == -1)
+ {
+ // ... if this is just an EWOULDBLOCK we must schedule the
+ // message for later, if it is ETIME we still have to send
+ // the complete message, because cutting off the message at
+ // this point will destroy the synchronization with the
+ // server ...
+ if (errno != EWOULDBLOCK && errno != ETIME)
+ {
+ return -1;
+ }
+ }
- if (this->must_flush_queue_i (stub))
+ // ... let's figure out if the complete message was sent ...
+ if (message_block->total_length () == byte_count)
{
- ace_mon.release ();
- int result = flushing_strategy->flush_message (this,
- this->tail_);
- return result;
+ // Done, just return. Notice that there are no allocations
+ // or copies up to this point (though some fancy calling
+ // back and forth).
+ // This is the common case for the critical path, it should
+ // be fast.
+ return 0;
}
- return 0;
}
- // ... in this case we must try to send the message first ...
+ // ... either the message must be queued or we need to queue it
+ // because it was not completely sent out ...
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::send_message_i, "
- "trying to send the message\n",
+ "message is queued\n",
this->id ()));
}
- size_t byte_count;
+ TAO_Queued_Message *queued_message = 0;
+ ACE_NEW_RETURN (queued_message,
+ TAO_Asynch_Queued_Message (message_block),
+ -1);
+ queued_message->bytes_transferred (byte_count);
+ queued_message->push_back (this->head_, this->tail_);
- // @@ I don't think we want to hold the mutex here, however if
- // we release it we need to recheck the status of the transport
- // after we return... once I understand the final form for this
- // code I will re-visit this decision
- ssize_t n = this->send_message_block_chain (message_block,
- byte_count,
- max_wait_time);
- if (n == 0)
- return -1; // EOF
- else if (n == -1)
+ // ... if the queue is full we need to activate the output on the
+ // queue ...
+ if (this->must_flush_queue_i (stub))
{
- // ... if this is just an EWOULDBLOCK we must schedule the
- // message for later ...
- if (errno != EWOULDBLOCK)
- {
- return -1;
- }
+ this->orb_core ()->flushing_strategy ()->schedule_output (this);
}
- // ... let's figure out if the complete message was sent ...
- if (message_block->total_length () == byte_count)
- {
- // Done, just return. Notice that there are no allocations
- // or copies up to this point (though some fancy calling
- // back and forth).
- // This is the common case for the critical path, it should
- // be fast.
- return 0;
- }
+ // ... in any case, check for timeouts and report them to the
+ // application ...
+ if (max_wait_time != 0 && n == -1 && errno == ETIME)
+ return -1;
- // ... the message was only partially sent, schedule reactive
- // output...
- flushing_strategy->schedule_output (this);
+ return 0;
+}
- // ... and set it as the current message ...
- if (twoway_flag)
- {
- // ... we are going to block, so there is no need to clone
- // the message block...
- // @@ It seems wasteful to allocate a TAO_Queued_Message in
- // this case, but it is simpler to do it this way.
- TAO_Synch_Queued_Message synch_message (message_block);
-
- synch_message.bytes_transferred (byte_count);
- synch_message.push_back (this->head_, this->tail_);
-
- // Release the mutex, other threads may modify the queue as we
- // block for a long time writing out data.
- int result;
- {
- ace_mon.release ();
- result = flushing_strategy->flush_message (this,
- &synch_message);
-
- ace_mon.acquire ();
- }
- ACE_ASSERT (synch_message.next () == 0);
- ACE_ASSERT (synch_message.prev () == 0);
- synch_message.destroy ();
- return result;
- }
+int
+TAO_Transport::send_synchronous_message_i (TAO_Stub *stub,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
+{
+ // We are going to block, so there is no need to clone
+ // the message block.
+ TAO_Synch_Queued_Message synch_message (message_block);
- TAO_Queued_Message *queued_message = 0;
- ACE_NEW_RETURN (queued_message,
- TAO_Asynch_Queued_Message (message_block),
- -1);
+ synch_message.push_back (this->head_, this->tail_);
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- " queued anyway, %d bytes sent\n",
- this->id (),
- byte_count));
- }
+ int n = this->drain_queue_i ();
+ if (n == -1)
+ return -1; // Error while sending...
+ else if (n == 1)
+ return 1; // Empty queue, message was sent..
- // ... insert at the head of the queue, we can use push_back()
- // because the queue is empty ...
+ ACE_ASSERT (n == 0); // Some data sent, but data remains.
- queued_message->bytes_transferred (byte_count);
- queued_message->push_back (this->head_, this->tail_);
+ if (synch_message.all_data_sent ())
+ return 1;
- // ... this is not a twoway. We must check if the buffering
- // constraints have been reached, if so, then we should start
- // flushing out data....
+ // @todo: Check for timeouts!
+ // if (max_wait_time != 0 && errno == ETIME) return -1;
- if (this->must_flush_queue_i (stub))
- {
- ace_mon.release ();
- int result = flushing_strategy->flush_message (this,
- this->tail_);
- return result;
- }
- return 0;
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+ (void) flushing_strategy->schedule_output (this);
+
+ // Release the mutex, other threads may modify the queue as we
+ // block for a long time writing out data.
+ int result;
+ {
+ typedef ACE_Reverse_Lock<TAO_SYNCH_MUTEX> TAO_REVERSE_SYNCH_MUTEX;
+ TAO_REVERSE_SYNCH_MUTEX reverse (this->queue_mutex_);
+
+ ACE_GUARD_RETURN (TAO_REVERSE_SYNCH_MUTEX, ace_mon, reverse, -1);
+ result = flushing_strategy->flush_message (this,
+ &synch_message);
+
+ }
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
+ return result;
}
int
@@ -547,18 +554,29 @@ TAO_Transport::make_idle (void)
void
TAO_Transport::close_connection (void)
{
- ACE_MT (ACE_GUARD (ACE_Lock,
- guard,
- *this->handler_lock_));
+ ACE_Event_Handler *eh = 0;
+ {
+ ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
- // Call handle close on the handler.
- // The event handler is as common as we can get
- ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh)
- eh->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::ALL_EVENTS_MASK);
+ eh = this->event_handler_i ();
+
+ this->transition_handler_state_i ();
+
+ if (eh == 0)
+ return;
+ }
+
+ // Close the underlying connection, it is enough to get an
+ // Event_Handler pointer to do this, so we can factor out the code
+ // in the base TAO_Transport class.
+ (void) eh->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::ALL_EVENTS_MASK);
// Purge the entry
+ // @todo This is redundant, handle_close() eventually calls
+ // this->connection_handler_closing(), that performs the same
+ // work, for some reason they hold the mutex while they do
+ // that work though.
this->orb_core_->transport_cache ().purge_entry (this->cache_map_entry_);
for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
@@ -580,11 +598,14 @@ TAO_Transport::send (iovec *iov, int iovcnt,
// if there's no associated event handler, then we act like a null transport
if (this->event_handler_i () == 0)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ")
- ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"),
- this->id (),
- this->tag_));
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ")
+ ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"),
+ this->id (),
+ this->tag_));
+ }
errno = ENOENT;
return -1;
}
@@ -716,6 +737,12 @@ TAO_Transport::drain_queue (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
+ return this->drain_queue_i ();
+}
+
+int
+TAO_Transport::drain_queue_i (void)
+{
if (this->head_ == 0)
return 1;
@@ -743,6 +770,12 @@ TAO_Transport::drain_queue (void)
ssize_t retval =
this->send (iov, iovcnt, byte_count);
+ if (TAO_debug_level == 2)
+ {
+ dump_iov (iov, iovcnt, this->id (),
+ byte_count, "drain_queue_i");
+ }
+
// ... now we need to update the queue, removing elements
// that have been sent, and updating the last element if it
// was only partially sent ...
@@ -775,6 +808,12 @@ TAO_Transport::drain_queue (void)
ssize_t retval =
this->send (iov, iovcnt, byte_count);
+ if (TAO_debug_level == 2)
+ {
+ dump_iov (iov, iovcnt, this->id (),
+ byte_count, "drain_queue_i");
+ }
+
this->cleanup_queue (byte_count);
iovcnt = 0;
@@ -788,6 +827,7 @@ TAO_Transport::drain_queue (void)
return 0;
return -1;
}
+ ACE_ASSERT (byte_count != 0);
if (this->head_ == 0)
return 1;
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 6e296a72fe5..e132d080ceb 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -507,7 +507,7 @@ public:
virtual int send_request (TAO_Stub *stub,
TAO_ORB_Core *orb_core,
TAO_OutputCDR &stream,
- int twoway,
+ int is_synchronous,
ACE_Time_Value *max_time_wait) = 0;
/// This is a request for the transport object to write a request
@@ -536,7 +536,7 @@ public:
// @@ lockme
virtual int send_message (TAO_OutputCDR &stream,
TAO_Stub *stub = 0,
- int twoway = 1,
+ int is_synchronous = 1,
ACE_Time_Value *max_time_wait = 0) = 0;
/// Callback to read incoming data
@@ -639,15 +639,29 @@ public:
size_t &bytes_transferred,
ACE_Time_Value *max_wait_time = 0);
- /// Sent the contents of <message_block>, blocking if required by
+ /// Sent the contents of <message_block>
+ /**
+ * @todo This method name sucks, but send_message() was already
+ * taken by other silly methods!
+ *
+ * @param stub The object reference used for this operation, useful
+ * to obtain the current policies.
+ * @param is_synchronous If set this method will block until the
+ * operation is completely written on the wire
+ * @param message_block The CDR encapsulation of the GIOP message
+ * that must be sent. The message may consist of
+ * multiple Message Blocks chained through the cont()
+ * field.
+ * @param max_wait_time The maximum time that the operation can
+ * block, used in the implementation of timeouts.
+ *
+ */
/// the twoway flag or by the current policies in the stub.
int send_message_i (TAO_Stub *stub,
- int twoway_flag,
+ int is_synchronous,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time);
- // TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *& cache_map_entry (void);
-
/// Cache management
void mark_invalid (void);
@@ -661,7 +675,7 @@ public:
int cancel_output (void);
private:
- /// Try to send the current message.
+ /// Send some of the data in the queue.
/**
* As the outgoing data is drained this method is invoked to send as
* much of the current message as possible.
@@ -671,6 +685,9 @@ private:
*/
int drain_queue (void);
+ /// Implement drain_queue() assuming the lock is held
+ int drain_queue_i (void);
+
/// Cleanup the queue.
/**
* Exactly <byte_count> bytes have been sent, the queue must be
@@ -686,6 +703,12 @@ private:
/// Check if the buffering constraints have been reached
int must_flush_queue_i (TAO_Stub *stub);
+ /// Send a synchronous message, i.e. block until the message is on
+ /// the wire
+ int send_synchronous_message_i (TAO_Stub *stub,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))
ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&))
diff --git a/TAO/tests/Big_Oneways/Session.cpp b/TAO/tests/Big_Oneways/Session.cpp
index 409a01595b0..0ce9bd9f2ce 100644
--- a/TAO/tests/Big_Oneways/Session.cpp
+++ b/TAO/tests/Big_Oneways/Session.cpp
@@ -47,6 +47,11 @@ Session::svc (void)
Test::Payload payload (this->payload_size_);
payload.length (this->payload_size_);
+ for (CORBA::ULong j = 0; j != this->payload_size_; ++j)
+ {
+ payload[j] = j % 256;
+ }
+
// Get the number of peers just once.
CORBA::ULong session_count =
this->other_sessions_.length ();
@@ -115,6 +120,7 @@ Session::validate_connections (CORBA::Environment &ACE_TRY_ENV)
ACE_CATCHANY {} ACE_ENDTRY;
}
}
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) connections are ready\n"));
}
void