summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-06-14 18:13:49 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-06-14 18:13:49 +0000
commit314fcbfe47cc316aa92178caa98efeaa512caa0d (patch)
tree429110387dde74ee0b5ce0d303596b3a8dd8fee0 /TAO/tao/Transport.cpp
parentcf804eb57c0c927ce04ffc83de80cf589f71cc8f (diff)
downloadATCD-314fcbfe47cc316aa92178caa98efeaa512caa0d.tar.gz
ChangeLogTag:Fri Jun 14 13:58:56 2002 Carlos O'Ryan <coryan@atdesk.com>
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp650
1 files changed, 355 insertions, 295 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index ea2de3ca725..13298f4575b 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -104,27 +104,21 @@ TAO_Transport::~TAO_Transport (void)
delete this->handler_lock_;
- TAO_Queued_Message *i = this->head_;
- while (i != 0)
+ while (this->head_ != 0)
{
- // @@ This is a good point to insert a flag to indicate that a
- // CloseConnection message was successfully received.
- i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED);
-
- TAO_Queued_Message *tmp = i;
- i = i->next ();
-
- tmp->destroy ();
+ TAO_Queued_Message *i = this->head_;
+ this->head_ = i->next ();
+ i->destroy ();
}
- // Avoid making the call if we can. This may be redundant, unless
- // someone called handle_close() on the connection handler from
- // outside the TAO_Transport.
- if (this->cache_map_entry_ != 0)
- {
- this->orb_core_->lane_resources ().transport_cache ().purge_entry (
+ // Avoid making the call if we can. This may be redundant, unless
+ // someone called handle_close() on the connection handler from
+ // outside the TAO_Transport.
+ if (this->cache_map_entry_ != 0)
+ {
+ this->orb_core_->lane_resources ().transport_cache ().purge_entry (
this->cache_map_entry_);
- }
+ }
}
int
@@ -186,8 +180,8 @@ dump_iov (iovec *iov, int iovcnt, int id,
ACE_Log_Msg::instance ()->acquire ();
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::%s"
- " sending %d buffers\n",
+ "TAO (%P|%t) - Transport[%d]::%s, "
+ "sending %d buffers\n",
id, location, iovcnt));
for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
{
@@ -198,8 +192,8 @@ dump_iov (iovec *iov, int iovcnt, int id,
iov_len = current_transfer;
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::%s"
- " buffer %d/%d has %d bytes\n",
+ "TAO (%P|%t) - Transport[%d]::%s, "
+ "buffer %d/%d has %d bytes\n",
id, location,
i, iovcnt,
iov_len));
@@ -209,10 +203,10 @@ dump_iov (iovec *iov, int iovcnt, int id,
{
ACE_TCHAR header[1024];
ACE_OS::sprintf (header,
- ACE_LIB_TEXT("TAO - Transport[%d]::%s (")
- ACE_SIZE_T_FORMAT_SPECIFIER ACE_LIB_TEXT("/")
- ACE_SIZE_T_FORMAT_SPECIFIER ACE_LIB_TEXT(")\n"),
- id, ACE_TEXT_CHAR_TO_TCHAR(location), offset, iov_len);
+ "TAO - Transport[%d]::%s ("
+ ACE_SIZE_T_FORMAT_SPECIFIER "/"
+ ACE_SIZE_T_FORMAT_SPECIFIER ")\n",
+ id, location, offset, iov_len);
len = iov_len - offset;
if (len > 512)
@@ -225,9 +219,9 @@ dump_iov (iovec *iov, int iovcnt, int id,
current_transfer -= iov_len;
}
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::%s")
- ACE_LIB_TEXT(" end of data\n"),
- id, ACE_TEXT_CHAR_TO_TCHAR(location)));
+ "TAO (%P|%t) - Transport[%d]::%s, "
+ "end of data\n",
+ id, location));
ACE_Log_Msg::instance ()->release ();
}
@@ -239,7 +233,7 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
- if (this->check_event_handler_i ("TAO_Transport::send_message_block_chain") == -1)
+ if (this->check_event_handler_i ("Transport::send_message_block_chain") == -1)
return -1;
return this->send_message_block_chain_i (mb,
@@ -289,144 +283,27 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
}
int
-TAO_Transport::send_message_i (TAO_Stub *stub,
- int is_synchronous,
- const ACE_Message_Block *message_block,
- ACE_Time_Value *max_wait_time)
+TAO_Transport::send_message_shared (TAO_Stub *stub,
+ int is_synchronous,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
{
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
-
- if (this->check_event_handler_i ("TAO_Transport::send_message_i") == -1)
- return -1;
-
- if (is_synchronous)
- {
- return this->send_synchronous_message_i (message_block,
- max_wait_time);
- }
-
- // Let's figure out if the message should be queued without trying
- // to send first:
- int try_sending_first = 1;
-
- int queue_empty = (this->head_ == 0);
-
- if (!queue_empty)
- try_sending_first = 0;
- else if (stub->sync_strategy ().must_queue (queue_empty))
- try_sending_first = 0;
-
- ssize_t n;
-
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
-
- if (try_sending_first)
- {
- size_t byte_count = 0;
- // ... in this case we must try to send the message first ...
-
- size_t total_length = message_block->total_length ();
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ")
- ACE_LIB_TEXT("trying to send the message (ml = %d)\n"),
- this->id (), total_length));
- }
-
- // @@ I don't think we want to hold the mutex here, however if
- // we release it we need to recheck the status of the transport
- // after we return... once I understand the final form for this
- // code I will re-visit this decision
- n = this->send_message_block_chain_i (message_block,
- byte_count,
- max_wait_time);
- if (n == -1)
- {
- // ... if this is just an EWOULDBLOCK we must schedule the
- // message for later, if it is ETIME we still have to send
- // the complete message, because cutting off the message at
- // this point will destroy the synchronization with the
- // server ...
- if (errno != EWOULDBLOCK && errno != ETIME)
- {
- return -1;
- }
- }
-
- // ... let's figure out if the complete message was sent ...
- if (total_length == byte_count)
- {
- // Done, just return. Notice that there are no allocations
- // or copies up to this point (though some fancy calling
- // back and forth).
- // This is the common case for the critical path, it should
- // be fast.
- return 0;
- }
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ")
- ACE_LIB_TEXT("partial send %d / %d bytes\n"),
- this->id (), byte_count, total_length));
- }
-
- // ... part of the data was sent, need to figure out what piece
- // of the message block chain must be queued ...
- while (message_block != 0 && message_block->length () == 0)
- message_block = message_block->cont ();
-
- // ... at least some portion of the message block chain should
- // remain ...
- ACE_ASSERT (message_block != 0);
- }
-
- // ... either the message must be queued or we need to queue it
- // because it was not completely sent out ...
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ")
- ACE_LIB_TEXT("message is queued\n"),
- this->id ()));
- }
-
- TAO_Queued_Message *queued_message = 0;
- ACE_NEW_RETURN (queued_message,
- TAO_Asynch_Queued_Message (message_block),
- -1);
- queued_message->push_back (this->head_, this->tail_);
-
- // ... if the queue is full we need to activate the output on the
- // queue ...
- int must_flush = 0;
- int constraints_reached =
- this->check_buffering_constraints_i (stub,
- must_flush);
-
- // ... but we also want to activate it if the message was partially
- // sent.... Plus, when we use the blocking flushing strategy the
- // queue is flushed as a side-effect of 'schedule_output()'
-
- if (constraints_reached || try_sending_first)
- {
- (void) flushing_strategy->schedule_output (this);
- }
+ int r;
+ {
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
- if (must_flush)
- {
- typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
- TAO_REVERSE_LOCK reverse (*this->handler_lock_);
- ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ if (this->check_event_handler_i ("Transport::send_message_shared") == -1)
+ return -1;
- (void) flushing_strategy->flush_transport (this);
- }
+ r = this->send_message_shared_i (stub, is_synchronous,
+ message_block, max_wait_time);
+ }
+ if (r == -1)
+ {
+ this->close_connection ();
+ }
- return 0;
+ return r;
}
int
@@ -517,8 +394,9 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
if (TAO_debug_level > 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT("TAO (%P|%t) TAO_Transport::send_synchronous_message_i, ")
- ACE_LIB_TEXT("error while flushing message %p\n"), ""));
+ "TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, "
+ "error while flushing message %p\n",
+ this->id (), ""));
}
return -1;
@@ -552,22 +430,20 @@ TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
ACE_NOTSUP_RETURN (-1);
}
-
void
TAO_Transport::connection_handler_closing (void)
{
- {
- ACE_MT (ACE_GUARD (ACE_Lock,
- guard,
- *this->handler_lock_));
+ // The connection has closed, we must invalidate the handler to
+ // ensure that any attempt to use this transport results in an
+ // error. Basically all the other methods in the Transport
+ // cooperate via check_event_handler_i()
- this->transition_handler_state_i ();
- }
- // Can't hold the lock while we release, b/c the release could
- // invoke the destructor!
+ (void) this->invalidate_event_handler ();
+ this->send_connection_closed_notifications ();
- // This should be the last thing we do here
- TAO_Transport::release(this);
+ // Can't hold the lock while we release, b/c the release could
+ // invoke the destructor! This should be the last thing we do here
+ TAO_Transport::release (this);
}
TAO_Transport*
@@ -594,14 +470,21 @@ TAO_Transport::release (TAO_Transport* transport)
else if (count < 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("(%P|%t) TAO_Transport::release, ")
- ACE_LIB_TEXT ("reference countis less than zero: %d\n"),
- count));
+ "TAO (%P|%t) - Transport[%d]::release, "
+ "reference count is less than zero: %d\n",
+ transport->id (), count));
ACE_OS::abort ();
}
}
}
+// @@ TODO Somebody needs to go throught this code and change those
+// stupidly long calls sequences:
+//
+// this->orb_core_->lane_resources ().transport_cache ().
+//
+// to a single inline function!!
+
int
TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
{
@@ -637,36 +520,40 @@ TAO_Transport::make_idle (void)
void
TAO_Transport::close_connection (void)
{
- this->close_connection_i ();
-
- // Purge the entry
- if (this->cache_map_entry_ != 0)
- {
- this->orb_core_->lane_resources ().transport_cache ().purge_entry (
- this->cache_map_entry_);
- }
+ ACE_Event_Handler * eh = this->invalidate_event_handler ();
+ this->close_connection_shared (1, eh);
}
-
void
TAO_Transport::close_connection_i (void)
{
- ACE_Event_Handler *eh = 0;
- {
- ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
+ ACE_Event_Handler * eh = this->invalidate_event_handler_i ();
+ this->close_connection_shared (1, eh);
+}
- eh = this->event_handler_i ();
+void
+TAO_Transport::close_connection_no_purge (void)
+{
+ ACE_Event_Handler * eh = this->invalidate_event_handler ();
+ this->close_connection_shared (0, eh);
+}
- this->transition_handler_state_i ();
+void
+TAO_Transport::close_connection_shared (int disable_purge,
+ ACE_Event_Handler * eh)
+{
+ // Purge the entry
+ if (!disable_purge && this->cache_map_entry_ != 0)
+ {
+ this->orb_core_->lane_resources ().transport_cache ().purge_entry (
+ this->cache_map_entry_);
+ }
- if (eh == 0)
+ if (eh == 0)
+ {
+ // The connection was already closed
return;
- }
-
- // Close the underlying connection, it is enough to get an
- // Event_Handler pointer to do this, so we can factor out the code
- // in the base TAO_Transport class.
-
+ }
// We first try to remove the handler from the reactor. After that
// we destroy the handler using handle_close (). The remove handler
@@ -683,10 +570,7 @@ TAO_Transport::close_connection_i (void)
(void) eh->handle_close (ACE_INVALID_HANDLE,
ACE_Event_Handler::ALL_EVENTS_MASK);
- for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
- {
- i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED);
- }
+ this->send_connection_closed_notifications ();
}
ssize_t
@@ -699,7 +583,7 @@ TAO_Transport::send (iovec *iov, int iovcnt,
*this->handler_lock_,
-1));
- if (this->check_event_handler_i ("TAO_Transport::send") == -1)
+ if (this->check_event_handler_i ("Transport::send") == -1)
return -1;
// now call the template method
@@ -716,7 +600,7 @@ TAO_Transport::recv (char *buffer,
*this->handler_lock_,
-1));
- if (this->check_event_handler_i ("TAO_Transport::recv") == -1)
+ if (this->check_event_handler_i ("Transport::recv") == -1)
return -1;
// now call the template method
@@ -736,8 +620,9 @@ TAO_Transport::generate_locate_request (
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("(%P|%t) Error in marshalling the \n")
- ACE_LIB_TEXT ("LocateRequest Header \n")));
+ "TAO (%P|%t) - Transport[%d]::generate_locate_request, "
+ "error while marshalling the LocateRequest header\n",
+ this->id ()));
return -1;
}
@@ -758,8 +643,9 @@ TAO_Transport::generate_request_header (
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("(%P|%t) Error in marshalling the \n")
- ACE_LIB_TEXT ("LocateRequest Header \n")));
+ "(%P|%t) - Transport[%d]::generate_request_header, "
+ "error while marshalling the Request header\n",
+ this->id()));
return -1;
}
@@ -775,7 +661,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::handle_input\n"),
+ "TAO (%P|%t) - Transport[%d]::handle_input\n",
this->id ()));
}
@@ -788,10 +674,11 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
{
if (TAO_debug_level > 2)
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) Transport::handle_input_i,")
- ACE_LIB_TEXT("error while parsing the head of the queue \n")));
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "error while parsing the head of the queue\n",
+ this->id()));
- this->tms_->connection_closed ();
+ this->send_connection_closed_notifications ();
}
return retval;
}
@@ -838,7 +725,9 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
if (n <= 0)
{
if (n == -1)
- this->tms_->connection_closed ();
+ {
+ this->send_connection_closed_notifications ();
+ }
return n;
}
@@ -846,8 +735,9 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) Read [%d] bytes from transport [%d]\n"),
- n, this->id ()));
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "read %d bytes\n",
+ this->id (), n));
}
// Set the write pointer in the stack buffer
@@ -864,8 +754,9 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
if (retval == -1 && TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport::handle_input_i ")
- ACE_LIB_TEXT("error while parsing and consolidating \n")));
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "error while parsing and consolidating\n",
+ this->id ()));
}
return retval;
}
@@ -943,9 +834,11 @@ TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
{
if (TAO_debug_level > 2)
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("TAO (%P|%t) - error in incoming message \n")));
+ "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
+ "error in incoming message\n",
+ this->id ()));
- this->tms_->connection_closed ();
+ this->send_connection_closed_notifications ();
return -1;
}
}
@@ -984,7 +877,7 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n"),
+ "TAO (%P|%t) - Transport[%d]::consolidate_message\n",
this->id ()));
}
@@ -1013,12 +906,12 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Read [%d] bytes on attempt \n",
- n));
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "read %d bytes on attempt\n",
+ this->id(), n));
}
- if (n == 0 ||
- n == -1)
+ if (n == 0 || n == -1)
{
break;
}
@@ -1033,10 +926,11 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Trasport::consolidate_message,")
- ACE_LIB_TEXT("error while trying to consolidate \n")));
+ "TAO (%P|%t) - Trasport[%d]::consolidate_message, "
+ "error while trying to consolidate\n",
+ this->id ()));
}
- this->tms_->connection_closed ();
+ this->send_connection_closed_notifications ();
return -1;
}
@@ -1056,8 +950,8 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n")
- ACE_LIB_TEXT("queueing up the message \n"),
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "queueing up the message\n",
this->id ()));
}
@@ -1154,7 +1048,7 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_message_queue \n"),
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n",
this->id ()));
}
@@ -1187,8 +1081,9 @@ TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
if (TAO_debug_level)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Error while consolidating... \n"),
- ACE_LIB_TEXT("TAO (%P|%t) - .. part of the read message \n")));
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "error while consolidating, part of the read message\n",
+ this->id ()));
}
return retval;
}
@@ -1277,7 +1172,7 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n"),
+ "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n",
this->id ()));
}
@@ -1302,8 +1197,8 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n")
- ACE_LIB_TEXT(".............. extracting extra messages \n"),
+ "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, "
+ "extracting extra messages\n",
this->id ()));
}
@@ -1351,11 +1246,11 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_LIB_TEXT ("Close Connection Message recd \n")));
+ "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
+ "received CloseConnection message %p\n",
+ this->id(), ""));
- // Close the TMS
- this->tms_->connection_closed ();
+ this->send_connection_closed_notifications ();
// Return a "-1" so that the next stage can take care of
// closing connection and the necessary memory management.
@@ -1372,8 +1267,7 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
this,
qd) == -1)
{
- // Close the TMS
- this->tms_->connection_closed ();
+ this->send_connection_closed_notifications ();
// Return a "-1" so that the next stage can take care of
// closing connection and the necessary memory management.
@@ -1395,12 +1289,11 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_LIB_TEXT ("IIOP_Transport::process_message, ")
- ACE_LIB_TEXT ("process_reply_message ()")));
+ "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
+ "error in process_reply_message %p\n",
+ this->id (), ""));
- this->messaging_object ()->reset ();
- this->tms_->connection_closed ();
+ this->send_connection_closed_notifications ();
return -1;
}
@@ -1412,23 +1305,25 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
// every reply on this connection.
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("TAO (%P|%t) : IIOP_Transport::")
- ACE_LIB_TEXT ("process_message - ")
- ACE_LIB_TEXT ("dispatch reply failed\n")));
+ "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
+ "dispatch reply failed\n",
+ this->id ()));
- this->messaging_object ()->reset ();
- this->tms_->connection_closed ();
+ this->send_connection_closed_notifications ();
return -1;
}
}
else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
{
- // Ys, we print out all levels that we are closing the
- // connection.
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("(%P|%t) Closing down the connection \n")),
- -1);
+ if (TAO_debug_level)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
+ "received MessageError, closing connection\n",
+ this->id ()),
+ -1);
+ }
}
// If not, just return back..
@@ -1489,7 +1384,7 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::process_queue_head \n"),
+ "TAO (%P|%t) - Transport[%d]::process_queue_head\n",
this->id ()));
}
@@ -1503,8 +1398,8 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO(%P|%t) - Transport[%d]::process_queue_head")
- ACE_LIB_TEXT(" the size of the queue is [%d] \n"),
+ "TAO(%P|%t) - Transport[%d]::process_queue_head, "
+ "the size of the queue is [%d]\n",
this->id (),
this->incoming_message_queue_.queue_length()));
}
@@ -1527,7 +1422,8 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::notify to Reactor\n"),
+ "TAO (%P|%t) - Transport[%d]::process_queue_header, "
+ "notify to Reactor\n",
this->id ()));
}
@@ -1544,8 +1440,9 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
// @@todo: need to think about what is the action that
// we can take when we get here.
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("TAO (%P|%t) - Transport::process_queue_head ")
- ACE_LIB_TEXT ("notify to the reactor failed.. \n")));
+ "TAO (%P|%t) - Transport[%d]::process_queue_head, "
+ "notify to the reactor failed..\n",
+ this->id ()));
}
}
}
@@ -1623,7 +1520,7 @@ TAO_Transport::schedule_output_i (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::schedule_output\n"),
+ "TAO (%P|%t) - Transport[%d]::schedule_output\n",
this->id ()));
}
@@ -1644,7 +1541,7 @@ TAO_Transport::cancel_output_i (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - Transport[%d]::cancel_output\n"),
+ "TAO (%P|%t) - Transport[%d]::cancel_output\n",
this->id ()));
}
@@ -1658,8 +1555,9 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::handle_timeout, ")
- ACE_LIB_TEXT("timer expired\n")));
+ "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, "
+ "timer expired\n",
+ this->id ()));
}
/// This is the only legal ACT in the current configuration....
@@ -1704,7 +1602,7 @@ TAO_Transport::drain_queue (void)
int
TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
{
- if (this->check_event_handler_i ("TAO_Transport::drain_queue_helper") == -1)
+ if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1)
return -1;
size_t byte_count = 0;
@@ -1730,8 +1628,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_helper, ")
- ACE_LIB_TEXT("send() returns 0")));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "send() returns 0",
+ this->id ()));
}
return -1;
}
@@ -1740,9 +1639,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_helper, ")
- ACE_LIB_TEXT("%p"),
- ACE_LIB_TEXT("send()")));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "error during %p\n",
+ this->id (), "send_i()"));
}
if (errno == EWOULDBLOCK)
return 0;
@@ -1756,9 +1655,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_helper, ")
- ACE_LIB_TEXT("byte_count = %d, head_is_empty = %d\n"),
- byte_count, (this->head_ == 0)));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "byte_count = %d, head_is_empty = %d\n",
+ this->id(), byte_count, (this->head_ == 0)));
}
return 1;
}
@@ -1790,9 +1689,9 @@ TAO_Transport::drain_queue_i (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_i, ")
- ACE_LIB_TEXT("helper retval = %d\n"),
- retval));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
+ "helper retval = %d\n",
+ this->id (), retval));
}
if (retval != 1)
return retval;
@@ -1814,9 +1713,9 @@ TAO_Transport::drain_queue_i (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::drain_queue_i, ")
- ACE_LIB_TEXT("helper retval = %d\n"),
- retval));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
+ "helper retval = %d\n",
+ this->id (), retval));
}
if (retval != 1)
return retval;
@@ -1853,9 +1752,9 @@ TAO_Transport::cleanup_queue (size_t byte_count)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::cleanup_queue, ")
- ACE_LIB_TEXT("byte_count = %d\n"),
- byte_count));
+ "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
+ "byte_count = %d\n",
+ this->id (), byte_count));
}
// Update the state of the first message
@@ -1864,9 +1763,9 @@ TAO_Transport::cleanup_queue (size_t byte_count)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("TAO (%P|%t) - TAO_Transport::cleanup_queue, ")
- ACE_LIB_TEXT("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
- byte_count, i->all_data_sent (),
+ "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
+ "after transfer, bc = %d, all_sent = %d, ml = %d\n",
+ this->id (), byte_count, i->all_data_sent (),
i->message_length ()));
}
@@ -1939,13 +1838,174 @@ TAO_Transport::report_invalid_event_handler (const char *caller)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("(%P|%t) transport %d (tag=%d) %s ")
- ACE_LIB_TEXT("no longer associated with handler, ")
- ACE_LIB_TEXT("returning -1 with errno = ENOENT\n"),
- this->id (),
- this->tag_,
- ACE_TEXT_CHAR_TO_TCHAR(caller)));
+ "(%P|%t) - Transport[%d]::report_invalid_event_handler"
+ "(%s) no longer associated with handler [tag=%d]\n",
+ this->id (), caller, this->tag_));
+ }
+}
+
+ACE_Event_Handler *
+TAO_Transport::invalidate_event_handler (void)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0));
+
+ return this->invalidate_event_handler_i ();
+}
+
+void
+TAO_Transport::send_connection_closed_notifications (void)
+{
+ while (this->head_ != 0)
+ {
+ TAO_Queued_Message *i = this->head_;
+
+ // @@ This is a good point to insert a flag to indicate that a
+ // CloseConnection message was successfully received.
+ i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED);
+
+ this->head_ = i->next ();
+
+ i->destroy ();
+ }
+
+ this->tms ()->connection_closed ();
+ this->messaging_object ()->reset ();
+}
+
+int
+TAO_Transport::send_message_shared_i (TAO_Stub *stub,
+ int is_synchronous,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
+{
+ if (is_synchronous)
+ {
+ return this->send_synchronous_message_i (message_block,
+ max_wait_time);
+ }
+
+ // Let's figure out if the message should be queued without trying
+ // to send first:
+ int try_sending_first = 1;
+
+ int queue_empty = (this->head_ == 0);
+
+ if (!queue_empty)
+ try_sending_first = 0;
+ else if (stub->sync_strategy ().must_queue (queue_empty))
+ try_sending_first = 0;
+
+ ssize_t n;
+
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+
+ if (try_sending_first)
+ {
+ size_t byte_count = 0;
+ // ... in this case we must try to send the message first ...
+
+ size_t total_length = message_block->total_length ();
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ "trying to send the message (ml = %d)\n",
+ this->id (), total_length));
+ }
+
+ // @@ I don't think we want to hold the mutex here, however if
+ // we release it we need to recheck the status of the transport
+ // after we return... once I understand the final form for this
+ // code I will re-visit this decision
+ n = this->send_message_block_chain_i (message_block,
+ byte_count,
+ max_wait_time);
+ if (n == -1)
+ {
+ // ... if this is just an EWOULDBLOCK we must schedule the
+ // message for later, if it is ETIME we still have to send
+ // the complete message, because cutting off the message at
+ // this point will destroy the synchronization with the
+ // server ...
+ if (errno != EWOULDBLOCK && errno != ETIME)
+ {
+ return -1;
+ }
+ }
+
+ // ... let's figure out if the complete message was sent ...
+ if (total_length == byte_count)
+ {
+ // Done, just return. Notice that there are no allocations
+ // or copies up to this point (though some fancy calling
+ // back and forth).
+ // This is the common case for the critical path, it should
+ // be fast.
+ return 0;
+ }
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ "partial send %d / %d bytes\n",
+ this->id (), byte_count, total_length));
+ }
+
+ // ... part of the data was sent, need to figure out what piece
+ // of the message block chain must be queued ...
+ while (message_block != 0 && message_block->length () == 0)
+ message_block = message_block->cont ();
+
+ // ... at least some portion of the message block chain should
+ // remain ...
+ ACE_ASSERT (message_block != 0);
}
+
+ // ... either the message must be queued or we need to queue it
+ // because it was not completely sent out ...
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ "message is queued\n",
+ this->id ()));
+ }
+
+ TAO_Queued_Message *queued_message = 0;
+ ACE_NEW_RETURN (queued_message,
+ TAO_Asynch_Queued_Message (message_block),
+ -1);
+ queued_message->push_back (this->head_, this->tail_);
+
+ // ... if the queue is full we need to activate the output on the
+ // queue ...
+ int must_flush = 0;
+ int constraints_reached =
+ this->check_buffering_constraints_i (stub,
+ must_flush);
+
+ // ... but we also want to activate it if the message was partially
+ // sent.... Plus, when we use the blocking flushing strategy the
+ // queue is flushed as a side-effect of 'schedule_output()'
+
+ if (constraints_reached || try_sending_first)
+ {
+ (void) flushing_strategy->schedule_output (this);
+ }
+
+ if (must_flush)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+
+ (void) flushing_strategy->flush_transport (this);
+ }
+
+ return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)