summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp1674
1 files changed, 885 insertions, 789 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 2a1f66c5a78..989fa5624ab 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -1,26 +1,26 @@
// $Id$
-#include "tao/Transport.h"
-
-#include "tao/LF_Follower.h"
-#include "tao/Leader_Follower.h"
-#include "tao/Client_Strategy_Factory.h"
-#include "tao/Wait_Strategy.h"
-#include "tao/Transport_Mux_Strategy.h"
-#include "tao/Stub.h"
-#include "tao/Transport_Queueing_Strategies.h"
-#include "tao/Connection_Handler.h"
-#include "tao/Pluggable_Messaging.h"
-#include "tao/Synch_Queued_Message.h"
-#include "tao/Asynch_Queued_Message.h"
-#include "tao/Flushing_Strategy.h"
-#include "tao/Thread_Lane_Resources.h"
-#include "tao/Resume_Handle.h"
-#include "tao/Codeset_Manager.h"
-#include "tao/Codeset_Translator_Base.h"
-#include "tao/debug.h"
-#include "tao/CDR.h"
-#include "tao/ORB_Core.h"
+#include "Transport.h"
+
+#include "LF_Follower.h"
+#include "Leader_Follower.h"
+#include "Client_Strategy_Factory.h"
+#include "Wait_Strategy.h"
+#include "Transport_Mux_Strategy.h"
+#include "Stub.h"
+#include "Transport_Queueing_Strategies.h"
+#include "Connection_Handler.h"
+#include "Pluggable_Messaging.h"
+#include "Synch_Queued_Message.h"
+#include "Asynch_Queued_Message.h"
+#include "Flushing_Strategy.h"
+#include "Thread_Lane_Resources.h"
+#include "Resume_Handle.h"
+#include "Codeset_Manager.h"
+#include "Codeset_Translator_Base.h"
+#include "debug.h"
+#include "CDR.h"
+#include "ORB_Core.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/OS_NS_stdio.h"
@@ -34,7 +34,7 @@
//@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
#if !defined (__ACE_INLINE__)
-# include "tao/Transport.inl"
+# include "Transport.inl"
#endif /* __ACE_INLINE__ */
@@ -55,7 +55,7 @@ dump_iov (iovec *iov, int iovcnt, size_t id,
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
ACE_TEXT ("sending %d buffers\n"),
- id, ACE_TEXT_CHAR_TO_TCHAR (location), iovcnt));
+ id, ACE_TEXT_TO_TCHAR_IN (location), iovcnt));
for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
{
@@ -70,7 +70,7 @@ dump_iov (iovec *iov, int iovcnt, size_t id,
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
ACE_TEXT ("buffer %d/%d has %d bytes\n"),
- id, ACE_TEXT_CHAR_TO_TCHAR(location),
+ id, ACE_TEXT_TO_TCHAR_IN(location),
i, iovcnt,
iov_len));
@@ -107,13 +107,11 @@ dump_iov (iovec *iov, int iovcnt, size_t id,
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
ACE_TEXT ("end of data\n"),
- id, ACE_TEXT_CHAR_TO_TCHAR(location)));
+ id, ACE_TEXT_TO_TCHAR_IN(location)));
ACE_Log_Msg::instance ()->release ();
}
-TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-
TAO_Transport::TAO_Transport (CORBA::ULong tag,
TAO_ORB_Core *orb_core)
: tag_ (tag)
@@ -181,9 +179,6 @@ TAO_Transport::~TAO_Transport (void)
// By the time the destructor is reached here all the connection stuff
// *must* have been cleaned up.
-
- // The following assert is needed for the test "Bug_2494_Regression".
- // See the bugzilla bug #2494 for details.
ACE_ASSERT (this->head_ == 0);
ACE_ASSERT (this->cache_map_entry_ == 0);
@@ -290,7 +285,7 @@ TAO_Transport::register_handler (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
+ "TAO (%P|%t) - Transport[%d]::register_handler\n",
this->id ()));
}
@@ -332,8 +327,8 @@ TAO_Transport::generate_locate_request (
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
- ACE_TEXT ("error while marshalling the LocateRequest header\n"),
+ "TAO (%P|%t) - Transport[%d]::generate_locate_request, "
+ "error while marshalling the LocateRequest header\n",
this->id ()));
}
@@ -365,8 +360,8 @@ TAO_Transport::generate_request_header (
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) - Transport[%d]::generate_request_header, ")
- ACE_TEXT ("error while marshalling the Request header\n"),
+ "(%P|%t) - Transport[%d]::generate_request_header, "
+ "error while marshalling the Request header\n",
this->id()));
}
@@ -401,7 +396,7 @@ TAO_Transport::make_idle (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
+ "TAO (%P|%t) - Transport[%d]::make_idle\n",
this->id ()));
}
@@ -425,7 +420,7 @@ TAO_Transport::handle_output (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"),
+ "TAO (%P|%t) - Transport[%d]::handle_output\n",
this->id ()));
}
@@ -437,8 +432,8 @@ TAO_Transport::handle_output (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
- ACE_TEXT ("drain_queue returns %d/%d\n"),
+ "TAO (%P|%t) - Transport[%d]::handle_output, "
+ "drain_queue returns %d/%d\n",
this->id (),
retval, errno));
}
@@ -487,14 +482,21 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
if (n == -1)
{
synch_message.remove_from_list (this->head_, this->tail_);
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
return -1; // Error while sending...
}
else if (n == 1)
{
+ ACE_ASSERT (synch_message.all_data_sent ());
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
bytes_transferred = total_length;
return 1; // Empty queue, message was sent..
}
+ ACE_ASSERT (n == 0); // Some data sent, but data remains.
+
// Remove the temporary message from the queue...
synch_message.remove_from_list (this->head_, this->tail_);
@@ -514,7 +516,7 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
synch_message.push_back (this->head_, this->tail_);
- int const n =
+ int n =
this->send_synch_message_helper_i (synch_message,
max_wait_time);
@@ -523,6 +525,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
return n;
}
+ ACE_ASSERT (n == 0);
+
// @todo: Check for timeouts!
// if (max_wait_time != 0 && errno == ETIME) return -1;
TAO_Flushing_Strategy *flushing_strategy =
@@ -583,14 +587,21 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
if (TAO_debug_level > 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
- ACE_TEXT ("error while flushing message - %m\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, "
+ "error while flushing message - %m\n",
+ this->id ()));
}
return -1;
}
+ else
+ {
+ ACE_ASSERT (synch_message.all_data_sent () != 0);
+ }
+
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
return 1;
}
@@ -605,7 +616,7 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
synch_message.push_back (this->head_,
this->tail_);
- int const n =
+ int n =
this->send_synch_message_helper_i (synch_message,
max_wait_time);
@@ -614,12 +625,14 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
return n;
}
+ ACE_ASSERT (n == 0);
+
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
- ACE_TEXT ("preparing to add to queue before leaving \n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::send_reply_message_i, "
+ "preparing to add to queue before leaving \n",
+ this->id ()));
}
// Till this point we shouldn't have any copying and that is the
@@ -638,19 +651,7 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
- int result = flushing_strategy->schedule_output (this);
-
- if (result == -1)
- {
- if (TAO_debug_level > 5)
- {
- ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
- "message_i dequeuing msg due to schedule_output "
- "failure\n", this->id ()));
- }
- msg->remove_from_list (this->head_, this->tail_);
- msg->destroy ();
- }
+ (void) flushing_strategy->schedule_output (this);
return 1;
}
@@ -665,15 +666,24 @@ TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_mess
if (n == -1)
{
synch_message.remove_from_list (this->head_, this->tail_);
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
return -1; // Error while sending...
}
else if (n == 1)
{
+ ACE_ASSERT (synch_message.all_data_sent ());
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
return 1; // Empty queue, message was sent..
}
+ ACE_ASSERT (n == 0); // Some data sent, but data remains.
+
if (synch_message.all_data_sent ())
{
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
return 1;
}
@@ -693,32 +703,11 @@ TAO_Transport::schedule_output_i (void)
ACE_Event_Handler *eh = this->event_handler_i ();
ACE_Reactor *reactor = eh->reactor ();
- // Check to see if our event handler is still registered with the
- // reactor. It's possible for another thread to have run close_connection()
- // since we last used the event handler.
- ACE_Event_Handler *found = reactor->find_handler (eh->get_handle ());
- if (found != eh)
- {
- if(TAO_debug_level > 3)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::schedule_output_i "
- "event handler not found in reactor, returning -1\n",
- this->id ()));
- }
- if (found)
- {
- found->remove_reference ();
- }
- return -1;
- }
- found->remove_reference ();
-
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::schedule_output_i\n",
+ this->id ()));
}
return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
@@ -733,8 +722,8 @@ TAO_Transport::cancel_output_i (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::cancel_output_i\n",
+ this->id ()));
}
return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
@@ -747,9 +736,9 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
- ACE_TEXT ("timer expired\n"),
- this->id ()));
+ "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, "
+ "timer expired\n",
+ this->id ()));
}
/// This is the only legal ACT in the current configuration....
@@ -776,7 +765,7 @@ int
TAO_Transport::drain_queue (void)
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
- int const retval = this->drain_queue_i ();
+ int retval = this->drain_queue_i ();
if (retval == 1)
{
@@ -799,7 +788,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
size_t byte_count = 0;
// ... send the message ...
- ssize_t const retval =
+ ssize_t retval =
this->send (iov, iovcnt, byte_count);
if (TAO_debug_level == 5)
@@ -819,9 +808,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
- ACE_TEXT ("send() returns 0\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "send() returns 0\n",
+ this->id ()));
}
return -1;
}
@@ -830,9 +819,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
- ACE_TEXT ("error during %p\n"),
- this->id (), ACE_TEXT ("send()")));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "error during %p\n",
+ this->id (), ACE_TEXT ("send()")));
}
if (errno == EWOULDBLOCK || errno == EAGAIN)
@@ -845,6 +834,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
// ... start over, how do we guarantee progress? Because if
// no bytes are sent send() can only return 0 or -1
+ ACE_ASSERT (byte_count != 0);
// Total no. of bytes sent for a send call
this->sent_byte_count_ += byte_count;
@@ -852,9 +842,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
- ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
- this->id(), byte_count, (this->head_ == 0)));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "byte_count = %d, head_is_empty = %d\n",
+ this->id(), byte_count, (this->head_ == 0)));
}
return 1;
@@ -896,9 +886,9 @@ TAO_Transport::drain_queue_i (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
- ACE_TEXT ("helper retval = %d\n"),
- this->id (), retval));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
+ "helper retval = %d\n",
+ this->id (), retval));
}
if (retval != 1)
@@ -921,9 +911,9 @@ TAO_Transport::drain_queue_i (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
- ACE_TEXT ("helper retval = %d\n"),
- this->id (), retval));
+ "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
+ "helper retval = %d\n",
+ this->id (), retval));
}
if (retval != 1)
@@ -954,9 +944,9 @@ TAO_Transport::cleanup_queue_i ()
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
- ACE_TEXT ("cleaning up complete queue\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::cleanup_queue_i, "
+ "cleaning up complete queue\n",
+ this->id ()));
}
// Cleanup all messages
@@ -967,7 +957,7 @@ TAO_Transport::cleanup_queue_i ()
// @@ This is a good point to insert a flag to indicate that a
// CloseConnection message was successfully received.
i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
- this->orb_core_->leader_follower ());
+ this->orb_core_->leader_follower ());
i->remove_from_list (this->head_, this->tail_);
@@ -985,9 +975,9 @@ TAO_Transport::cleanup_queue (size_t byte_count)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
- ACE_TEXT ("byte_count = %d\n"),
- this->id (), byte_count));
+ "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
+ "byte_count = %d\n",
+ this->id (), byte_count));
}
// Update the state of the first message
@@ -996,10 +986,10 @@ TAO_Transport::cleanup_queue (size_t byte_count)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
- ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
- this->id (), byte_count, i->all_data_sent (),
- i->message_length ()));
+ "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
+ "after transfer, bc = %d, all_sent = %d, ml = %d\n",
+ this->id (), byte_count, i->all_data_sent (),
+ i->message_length ()));
}
// ... if all the data was sent the message must be removed from
@@ -1068,9 +1058,9 @@ TAO_Transport::report_invalid_event_handler (const char *caller)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
- ACE_TEXT ("(%s) no longer associated with handler [tag=%d]\n"),
- this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
+ "TAO (%P|%t) - Transport[%d]::report_invalid_event_handler"
+ "(%s) no longer associated with handler [tag=%d]\n",
+ this->id (), ACE_TEXT_TO_TCHAR_IN (caller), this->tag_));
}
}
@@ -1124,7 +1114,7 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
{
// Let's figure out if the message should be queued without trying
// to send first:
- bool try_sending_first = true;
+ bool try_sending_first = 1;
const bool queue_empty = (this->head_ == 0);
@@ -1148,9 +1138,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
- ACE_TEXT ("trying to send the message (ml = %d)\n"),
- this->id (), total_length));
+ "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, "
+ "trying to send the message (ml = %d)\n",
+ this->id (), total_length));
}
// @@ I don't think we want to hold the mutex here, however if
@@ -1172,10 +1162,10 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
if (TAO_debug_level > 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
- ACE_TEXT ("fatal error in ")
- ACE_TEXT ("send_message_block_chain_i - %m\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, "
+ "fatal error in "
+ "send_message_block_chain_i - %m\n",
+ this->id ()));
}
return -1;
}
@@ -1195,9 +1185,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
- ACE_TEXT ("partial send %d / %d bytes\n"),
- this->id (), byte_count, total_length));
+ "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, "
+ "partial send %d / %d bytes\n",
+ this->id (), byte_count, total_length));
}
// ... part of the data was sent, need to figure out what piece
@@ -1209,6 +1199,7 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
// ... at least some portion of the message block chain should
// remain ...
+ ACE_ASSERT (message_block != 0);
}
// ... either the message must be queued or we need to queue it
@@ -1217,18 +1208,18 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
- ACE_TEXT ("message is queued\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, "
+ "message is queued\n",
+ this->id ()));
}
if (this->queue_message_i(message_block) == -1)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
- ACE_TEXT ("cannot queue message for ")
- ACE_TEXT (" - %m\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, "
+ "cannot queue message for "
+ " - %m\n",
+ this->id ()));
return -1;
}
@@ -1287,13 +1278,13 @@ TAO_Transport::queue_message_i(const ACE_Message_Block *message_block)
int
TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Time_Value * max_wait_time,
- int /* block */ /* deprecated parameter */ )
+ int /*block*/)
{
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::handle_input\n",
+ this->id ()));
}
// First try to process messages of the head of the incoming queue.
@@ -1306,768 +1297,853 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
- ACE_TEXT ("error while parsing the head of the queue\n"),
- this->id()));
-
+ "TAO (%P|%t) - Transport[%d]::handle_input, "
+ "error while parsing the head of the queue\n",
+ this->id()));
}
- return -1;
}
- else
- {
- // retval == 0
- // Processed a message in queue successfully. This
- // thread must return to thread-pool now.
- return 0;
- }
+ return retval;
}
- TAO_Queued_Data *q_data = 0;
+ // If there are no messages then we can go ahead to read from the
+ // handle for further reading..
+
+ // The buffer on the stack which will be used to hold the input
+ // messages
+ char buf [TAO_MAXBUFSIZE];
+
+#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
+ (void) ACE_OS::memset (buf,
+ '\0',
+ sizeof buf);
+#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
+
+ // Create a data block
+ ACE_Data_Block db (sizeof (buf),
+ ACE_Message_Block::MB_DATA,
+ buf,
+ this->orb_core_->input_cdr_buffer_allocator (),
+ this->orb_core_->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->input_cdr_dblock_allocator ());
+
+ // Create a message block
+ ACE_Message_Block message_block (&db,
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->input_cdr_msgblock_allocator ());
- if (this->incoming_message_stack_.top (q_data) != -1
- && q_data->missing_data_ != TAO_MISSING_DATA_UNDEFINED)
+
+ // Align the message block
+ ACE_CDR::mb_align (&message_block);
+
+ size_t recv_size = 0;
+
+ if (this->orb_core_->orb_params ()->single_read_optimization ())
{
- /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */
- if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
- {
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
- ACE_TEXT ("error consolidating incoming message\n"),
- this->id ()));
- }
- return -1;
- }
+ recv_size =
+ message_block.space ();
}
else
{
- if (this->handle_input_parse_data (rh, max_wait_time) == -1)
- {
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
- ACE_TEXT ("error parsing incoming message\n"),
- this->id ()));
- }
- return -1;
- }
+ recv_size =
+ this->messaging_object ()->header_length ();
}
- return 0;
-}
-
-int
-TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
- TAO_Resume_Handle &rh)
-{
- // paranoid check
- if (q_data->missing_data_ != 0)
+ // If we have a partial message, copy it into our message block
+ // and clear out the partial message.
+ if (this->partial_message_ != 0 && this->partial_message_->length () != 0)
{
- if (TAO_debug_level > 0)
+ if (message_block.copy (this->partial_message_->rd_ptr (),
+ this->partial_message_->length ()) == 0)
{
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
- ACE_TEXT ("missing data\n"),
- this->id ()));
+ recv_size -= this->partial_message_->length ();
+ this->partial_message_->reset ();
}
- return -1;
- }
-
- if (q_data->more_fragments_ ||
- q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- {
- // consolidate message on top of stack, only for fragmented messages
- TAO_Queued_Data *new_q_data = 0;
-
- switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
+ else
{
- case -1: // error
- return -1;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::handle_input, "
+ "unable to copy the partial message\n",
+ this->id ()),
+ -1);
+ }
+ }
- case 0: // returning consolidated message in q_data
- if (!new_q_data)
- {
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
- ACE_TEXT ("error, consolidated message is NULL\n"),
- this->id ()));
- }
- return -1;
- }
+ // Saving the size of the received buffer in case any one needs to
+ // get the size of the message thats received in the
+ // context. Obviously the value will be changed for each recv call
+ // and the user is supposed to invoke the accessor only in the
+ // invocation context to get meaningful information.
+ this->recv_buffer_size_ = recv_size;
+ // Read the message into the message block that we have created on
+ // the stack.
+ ssize_t n = this->recv (message_block.wr_ptr (),
+ recv_size,
+ max_wait_time);
- if (this->process_parsed_messages (new_q_data, rh) == -1)
- {
- TAO_Queued_Data::release (new_q_data);
+ // If there is an error return to the reactor..
+ if (n <= 0)
+ {
+ return n;
+ }
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
- ACE_TEXT ("error processing consolidated message\n"),
- this->id ()));
- }
- return -1;
- }
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::handle_input, "
+ "read %d bytes\n",
+ this->id (), n));
+ }
- TAO_Queued_Data::release (new_q_data);
+ // Set the write pointer in the stack buffer
+ message_block.wr_ptr (n);
- break;
+ // Parse the message and try consolidating the message if
+ // needed.
+ retval = this->parse_consolidate_messages (message_block,
+ rh,
+ max_wait_time);
- case 1: // fragment has been stored in messaging_oject()
- break;
+ if (retval <= 0)
+ {
+ if (retval == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::handle_input, "
+ "error while parsing and consolidating\n",
+ this->id ()));
}
+ return retval;
}
- else
+
+ if (message_block.length () > 0)
{
- if (this->process_parsed_messages (q_data, rh) == -1)
+ // Make a node of the message block..
+ TAO_Queued_Data qd (&message_block,
+ this->orb_core_->transport_message_buffer_allocator ());
+
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
+
+ // Check whether the message was fragmented..
+ if (qd.more_fragments_ ||
+ (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
{
- TAO_Queued_Data::release (q_data);
+ // Duplicate the node that we have as the node is on stack..
+ TAO_Queued_Data *nqd =
+ TAO_Queued_Data::duplicate (qd);
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
- ACE_TEXT ("error processing message\n"),
- this->id ()));
- }
- return -1;
+ return this->consolidate_fragments (nqd, rh);
}
- TAO_Queued_Data::release (q_data);
-
+ // Process the message
+ return this->process_parsed_messages (&qd,
+ rh);
}
return 0;
}
int
-TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
+TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
{
- // consolidate message on top of stack, only for fragmented messages
-
- // paranoid check
- if (q_data->missing_data_ != 0)
+ // Parse the incoming message for validity. The check needs to be
+ // performed by the messaging objects.
+ switch (this->parse_incoming_messages (block))
{
- return -1;
- }
+ // An error has occurred during message parsing
+ case -1:
+ return -1;
- if (q_data->more_fragments_ ||
- q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- {
- TAO_Queued_Data *new_q_data = 0;
+ // This message block does not contain enough data to
+ // parse the header. We do not need to grow the partial
+ // message block since we are guaranteed that it can hold
+ // at least a GIOP header plus a GIOP fragment header.
+ case 1:
+ if (this->partial_message_ == 0)
+ {
+ this->allocate_partial_message_block ();
+ }
- switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
+ if (this->partial_message_ != 0 &&
+ this->partial_message_->copy (block.rd_ptr (),
+ block.length ()) == 0)
{
- case -1: // error
- return -1;
+ block.rd_ptr (block.length ());
+ return 0;
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::parse_consolidate_messages, "
+ "unable to save the partial message\n",
+ this->id ()),
+ -1);
+ }
- case 0: // returning consolidated message in new_q_data
- if (!new_q_data)
- {
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
- ACE_TEXT ("error, consolidated message is NULL\n"),
- this->id ()));
- }
- return -1;
- }
+ case 0: // The normal case
+ break;
- if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
- {
- TAO_Queued_Data::release (new_q_data);
- return -1;
- }
- break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::parse_consolidate_messages, "
+ "impossible return value from parse_incoming_messages\n",
+ this->id ()),
+ -1);
+ }
- case 1: // fragment has been stored in messaging_oject()
- break;
- }
+ // Check whether we have a complete message for processing
+ const ssize_t missing_data = this->missing_data (block);
+
+ if (missing_data < 0)
+ {
+ // If we have more than one message
+ return this->consolidate_extra_messages (block,
+ rh);
}
- else
+ else if (missing_data > 0)
{
- if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
- {
- TAO_Queued_Data::release (q_data);
- return -1;
- }
+ // If we have missing data then try doing a read or try queueing
+ // them.
+ return this->consolidate_message (block,
+ missing_data,
+ rh,
+ max_wait_time);
}
- return 0; // success
+ return 1;
}
int
-TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
- ACE_Time_Value * max_wait_time,
- TAO_Queued_Data *q_data)
+TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
{
- // paranoid check
- if (q_data == 0)
- {
- return -1;
- }
+ // If we have a queue and if the last message is not complete a
+ // complete one, then this read will get us the remaining data. So
+ // do not try to parse the header if we have an incomplete message
+ // in the queue.
+ if (this->incoming_message_queue_.is_tail_complete () != 0)
+ {
+ // As it looks like a new message has been read, process the
+ // message. Call the messaging object to do the parsing..
+ int retval =
+ this->messaging_object ()->parse_incoming_messages (block);
+
+ if (retval == -1 && TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
+ "error in incoming message\n",
+ this->id ()));
+ }
- if (TAO_debug_level > 3)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
- ACE_TEXT ("enter (missing data == %d)\n"),
- this->id (), q_data->missing_data_));
+ return retval;
}
- const size_t recv_size = q_data->missing_data_;
+ return 0;
+}
- // make sure the message_block has enough space
- const size_t message_size = recv_size
- + q_data->msg_block_->length();
- if (q_data->msg_block_->space() < recv_size)
+ssize_t
+TAO_Transport::missing_data (ACE_Message_Block &incoming)
+{
+ // If we have a incomplete message in the queue then find out how
+ // much of data is required to get a complete message.
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
{
- if (ACE_CDR::grow (q_data->msg_block_, message_size) == -1)
- {
- return -1;
- }
+ return this->incoming_message_queue_.missing_data_tail ();
}
- // Saving the size of the received buffer in case any one needs to
- // get the size of the message thats received in the
- // context. Obviously the value will be changed for each recv call
- // and the user is supposed to invoke the accessor only in the
- // invocation context to get meaningful information.
- this->recv_buffer_size_ = recv_size;
-
- // Read the message into the existing message block on heap
- const ssize_t n = this->recv (q_data->msg_block_->wr_ptr(),
- recv_size,
- max_wait_time);
+ return this->messaging_object ()->missing_data (incoming);
+}
- if (n <= 0)
+int
+TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ // Check whether the last message in the queue is complete..
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
{
- return n;
+ return this->consolidate_message_queue (incoming,
+ missing_data,
+ rh,
+ max_wait_time);
}
- if (TAO_debug_level > 3)
+ if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
- ACE_TEXT ("read bytes %d\n"),
- this->id (), n));
+ "TAO (%P|%t) - Transport[%d]::consolidate_message\n",
+ this->id ()));
}
- q_data->msg_block_->wr_ptr(n);
- q_data->missing_data_ -= n;
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ const size_t payload = missing_data + incoming.size ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
- if (q_data->missing_data_ == 0)
+ ssize_t n = 0;
+
+ // As this used for transports where things are available in one
+ // shot this looping should not create any problems.
+ for (ssize_t bytes = missing_data; bytes != 0; bytes -= n)
{
- // paranoid check
- if (this->incoming_message_stack_.pop (q_data) == -1)
+ // .. do a read on the socket again.
+ n = this->recv (incoming.wr_ptr (),
+ bytes,
+ max_wait_time);
+
+ if (TAO_debug_level > 6)
{
- return -1;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "read %d bytes on attempt\n",
+ this->id(), n));
}
- if (this->consolidate_process_message (q_data, rh) == -1)
+ if (n == 0 || n == -1)
{
- return -1;
+ break;
}
- }
-
- return 0;
-}
-
-int
-TAO_Transport::handle_input_parse_extra_messages (ACE_Message_Block &message_block)
-{
-
- // store buffer status of last extraction: -1 parse error, 0
- // incomplete message header in buffer, 1 complete messages header
- // parsed
- int buf_status = 0;
-
- TAO_Queued_Data *q_data = 0; // init
+ incoming.wr_ptr (n);
+ missing_data -= n;
+ }
- // parse buffer until all messages have been extracted, consolidate
- // and enqueue complete messages, if the last message being parsed
- // has missin data, it is stays on top of incoming_message_stack.
- while (message_block.length () > 0 &&
- (buf_status = this->messaging_object ()->extract_next_message
- (message_block, q_data)) != -1 &&
- q_data != 0) // paranoid check
+ // If we got an error..
+ if (n == -1)
{
- if (q_data->missing_data_ == 0)
- {
- if (this->consolidate_enqueue_message (q_data) == -1)
- {
- return -1;
- }
- }
- else // incomplete message read, probably the last message in buffer
+ if (TAO_debug_level > 4)
{
- // can not fail
- this->incoming_message_stack_.push (q_data);
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "error while trying to consolidate\n",
+ this->id ()));
}
- q_data = 0; // reset
- } // while
-
- if (buf_status == -1)
- {
return -1;
}
- return 0;
-}
-
-int
-TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh,
- ACE_Time_Value * max_wait_time)
-{
+ // If we had gotten a EWOULDBLOCK n would be equal to zero. But we
+ // have to put the message in the queue anyway. So let us proceed
+ // to do that and return...
- if (TAO_debug_level > 3)
+ // Check to see if we have messages in queue or if we have missing
+ // data . AT this point we cannot have have semi-complete messages
+ // in the queue as they would have been taken care before. Put
+ // ourselves in the queue and then try processing one of the
+ // messages..
+ if (missing_data >= 0 ||
+ this->incoming_message_queue_.queue_length () != 0)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
- ACE_TEXT ("enter\n"),
- this->id ()));
- }
+ if (missing_data == 0 ||
+ !this->incoming_message_queue_.is_tail_fragmented ())
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "queueing up the message\n",
+ this->id ()));
+ }
+ // Get a queued data
+ TAO_Queued_Data *qd =
+ this->make_queued_data (incoming);
- // The buffer on the stack which will be used to hold the input
- // messages, ACE_CDR::MAX_ALIGNMENT compensates the
- // memory-alignment. This improves performance with SUN-Java-ORB-1.4
- // and higher that sends fragmented requests of size 1024 bytes.
- char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
+ // Add the missing data to the queue
+ qd->missing_data_ = missing_data;
-#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
- (void) ACE_OS::memset (buf,
- '\0',
- sizeof buf);
-#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
-
- // Create a data block
- ACE_Data_Block db (sizeof (buf),
- ACE_Message_Block::MB_DATA,
- buf,
- this->orb_core_->input_cdr_buffer_allocator (),
- this->orb_core_->locking_strategy (),
- ACE_Message_Block::DONT_DELETE,
- this->orb_core_->input_cdr_dblock_allocator ());
-
- // Create a message block
- ACE_Message_Block message_block (&db,
- ACE_Message_Block::DONT_DELETE,
- this->orb_core_->input_cdr_msgblock_allocator ());
+ // Get the rest of the messaging data
+ this->messaging_object ()->get_message_data (qd);
+ // If this is a full GIOP fragment, then we need only
+ // to consolidate the fragments
+ if (missing_data == 0 &&
+ (qd->more_fragments_ ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ this->consolidate_fragments (qd, rh);
+ }
+ else
+ {
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
- // Align the message block
- ACE_CDR::mb_align (&message_block);
+ if (this->incoming_message_queue_.is_head_complete ())
+ {
+ return this->process_queue_head (rh);
+ }
+ }
+ }
+ else
+ {
+ // This block of code will only come into play when GIOP
+ // message fragmentation is employed. If we have a fragment
+ // in the message queue, we can only chain message blocks
+ // onto the TAO_Queued_Data for that fragment. Unless we have
+ // a full GIOP fragment, and since we know we're missing data,
+ // we need to save what we have until we can read in some more of
+ // the fragment until we get it all. This bit of data could be
+ // larger than what the partial message block can hold, so we may
+ // need to grow the partial message block.
+ if (this->partial_message_ == 0)
+ {
+ this->allocate_partial_message_block ();
+ }
- size_t recv_size = 0; // Note: unsigned integer
+ if (this->partial_message_ != 0)
+ {
+ const size_t incoming_length = incoming.length ();
+ ACE_CDR::grow (this->partial_message_,
+ incoming_length);
+ if (this->partial_message_->copy (incoming.rd_ptr (),
+ incoming_length) == 0)
+ {
+ incoming.rd_ptr (incoming_length);
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "unable to save the partial message\n",
+ this->id ()),
+ -1);
+ }
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "unable to allocate the partial message\n",
+ this->id ()),
+ -1);
+ }
+ }
- // Pointer to newly parsed message
- TAO_Queued_Data *q_data = 0;
+ return 0;
+ }
- // optimizing access of constants
- const size_t header_length =
- this->messaging_object ()->header_length ();
+ // We don't have any missing data. Just make a queued_data node with
+ // the existing message block and send it to the higher layers of
+ // the ORB.
+ TAO_Queued_Data pqd (&incoming,
+ this->orb_core_->transport_message_buffer_allocator ());
+ pqd.missing_data_ = missing_data;
+ this->messaging_object ()->get_message_data (&pqd);
- // paranoid check
- if (header_length > message_block.space ())
+ // Check whether the message was fragmented and try to consolidate
+ // the fragments..
+ if (pqd.more_fragments_ ||
+ (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
{
- return -1;
- }
+ // Duplicate the queued data as it is on stack..
+ TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
- if (this->orb_core_->orb_params ()->single_read_optimization ())
- {
- recv_size =
- message_block.space ();
+ return this->consolidate_fragments (nqd, rh);
}
- else
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (&pqd,
+ rh);
+}
+
+int
+TAO_Transport::consolidate_fragments (TAO_Queued_Data *queueable_message,
+ TAO_Resume_Handle &rh)
+{
+ // Get the version numbers
+ CORBA::Octet major = queueable_message->major_version_;
+ CORBA::Octet minor = queueable_message->minor_version_;
+ CORBA::UShort whole = major << 8 | minor;
+
+ switch(whole)
{
- // Single read optimization has been de-activated. That means
- // that we need to read from transport the GIOP header first
- // before the payload. This codes first checks the incoming
- // stack for partial messages which needs to be
- // consolidated. Otherwise we are in new cycle, reading complete
- // GIOP header of new incoming message.
- if (this->incoming_message_stack_.top (q_data) != -1
- && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
+ case 0x0100:
+ if (!queueable_message->more_fragments_)
{
- // There is a partial message on incoming_message_stack_
- // whose length is unknown so far. We need to consolidate
- // the GIOP header to get to know the payload size,
- recv_size = header_length - q_data->msg_block_->length ();
+ this->incoming_message_queue_.enqueue_tail (queueable_message);
}
else
{
- // Read amount of data forming GIOP header of new incoming
- // message.
- recv_size = header_length;
+ // Fragments aren't supported in 1.0. This is an error and
+ // we should reject it somehow. What do we do here? Do we throw
+ // an exception to the receiving side? Do we throw an exception
+ // to the sending side?
+ //
+ // At the very least, we need to log the fact that we received
+ // nonsense.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::enqueue_incoming_message ")
+ ACE_TEXT("detected a fragmented GIOP 1.0 message\n")),
+ -1);
}
- // POST: 0 <= recv_size <= header_length
+ break;
+ case 0x0101:
+ {
+ // One note is that TAO_Queued_Data contains version numbers,
+ // but doesn't indicate the actual protocol to which those
+ // version numbers refer. That's not a problem, though, because
+ // instances of TAO_Queued_Data live in a queue, and that queue
+ // lives in a particular instance of a Transport, and the
+ // transport instance has an association with a particular
+ // messaging_object. The concrete messaging object embodies a
+ // messaging protocol, and must cover all versions of that
+ // protocol. Therefore, we just need to cover the bases of all
+ // versions of that one protocol.
+
+ // In 1.1, fragments kinda suck because they don't have they're
+ // own message-specific header. Therefore, we have to find the
+ // fragment based on the major and minor version.
+ TAO_Queued_Data* fragment_message_chain =
+ this->incoming_message_queue_.find_fragment_chain (major, minor);
+
+ // Deal with the fragment and the queueable message
+ this->process_fragment (fragment_message_chain,
+ queueable_message,
+ major, minor, rh);
+ break;
+ }
+ case 0x0102:
+ {
+ // In 1.2, we get a little more context. There's a
+ // FRAGMENT message-specific header, and inside that is the
+ // request id with which the fragment is associated.
+ TAO_Queued_Data* fragment_message_chain =
+ this->incoming_message_queue_.find_fragment_chain (
+ queueable_message->request_id_);
+
+ // Deal with the fragment and the queueable message
+ this->process_fragment (fragment_message_chain,
+ queueable_message,
+ major, minor, rh);
+ break;
+ }
+ default:
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::consolidate_fragments ")
+ ACE_TEXT("can not handle a GIOP %d.%d ")
+ ACE_TEXT("message\n"), major, minor));
+ ACE_HEX_DUMP ((LM_DEBUG,
+ queueable_message->msg_block_->rd_ptr (),
+ queueable_message->msg_block_->length ()));
+ return -1;
}
- // POST: 0 <= recv_size <= message_block->space ()
- // If we have a partial message, copy it into our message block and
- // clear out the partial message.
- if (this->partial_message_ != 0 && this->partial_message_->length () > 0)
+ return 0;
+}
+
+void
+TAO_Transport::process_fragment (TAO_Queued_Data* fragment_message_chain,
+ TAO_Queued_Data* queueable_message,
+ CORBA::Octet major,
+ CORBA::Octet minor,
+ TAO_Resume_Handle &rh)
+{
+ // No fragment was found
+ if (fragment_message_chain == 0)
{
- // (*) Copy back the partial message into current read-buffer,
- // verify that the read-strategy of "recv_size" bytes is not
- // exceeded. The latter check guarantees that recv_size does not
- // roll-over and keeps in range
- // 0<=recv_size<=message_block->space()
- if (this->partial_message_->length () <= recv_size &&
- message_block.copy (this->partial_message_->rd_ptr (),
- this->partial_message_->length ()) == 0)
+ this->incoming_message_queue_.enqueue_tail (queueable_message);
+ }
+ else
+ {
+ if (fragment_message_chain->major_version_ != major ||
+ fragment_message_chain->minor_version_ != minor)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::process_fragment ")
+ ACE_TEXT("GIOP versions do not match ")
+ ACE_TEXT("(%d.%d != %d.%d\n"),
+ fragment_message_chain->major_version_,
+ fragment_message_chain->minor_version_,
+ major, minor));
+
+ // Find the last message block in the continuation
+ ACE_Message_Block* mb = fragment_message_chain->msg_block_;
+ while (mb->cont () != 0)
+ mb = mb->cont ();
+
+ // Add the current message block to the end of the chain
+ // after adjusting the read pointer to skip the header(s)
+ const size_t header_adjustment =
+ this->messaging_object ()->header_length () +
+ this->messaging_object ()->fragment_header_length (major, minor);
+ queueable_message->msg_block_->rd_ptr(header_adjustment);
+ mb->cont (queueable_message->msg_block_);
+
+ // Remove our reference to the message block. At this point
+ // the message block of the fragment head owns it as part of a
+ // chain
+ queueable_message->msg_block_ = 0;
+
+ if (!queueable_message->more_fragments_)
{
+ // This is the end of the fragments for this request
+ fragment_message_chain->consolidate ();
- recv_size -= this->partial_message_->length ();
- this->partial_message_->reset ();
- }
- else
- {
- return -1;
+ // Process the queue head to make sure that the newly
+ // consolidated fragments get handled
+ this->process_queue_head (rh);
}
+
+ // Get rid of the queuable message
+ TAO_Queued_Data::release (queueable_message);
}
- // POST: 0 <= recv_size <= buffer_space
+}
- if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
+int
+TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ if (TAO_debug_level > 4)
{
- // This event would cause endless looping, trying frequently to
- // read zero bytes from stream. This might happen, if TAOs
- // protocol implementation is not correct and tries to read data
- // beyond header without "single_read_optimazation" being
- // activated.
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
- ACE_TEXT ("Error - endless loop detection, closing connection"),
- this->id ()));
- }
- return -1;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n",
+ this->id ()));
}
- // Saving the size of the received buffer in case any one needs to
- // get the size of the message thats received in the
- // context. Obviously the value will be changed for each recv call
- // and the user is supposed to invoke the accessor only in the
- // invocation context to get meaningful information.
- this->recv_buffer_size_ = recv_size;
-
- // Read the message into the message block that we have created on
- // the stack.
- const ssize_t n = this->recv (message_block.wr_ptr (),
- recv_size,
- max_wait_time);
+ // If the queue did not have a complete message put this piece of
+ // message in the queue. We know it did not have a complete
+ // message. That is why we are here.
+ const size_t n =
+ this->incoming_message_queue_.copy_tail (incoming);
- // If there is an error return to the reactor..
- if (n <= 0)
+ if (TAO_debug_level > 6)
{
- return n;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "copied [%d] bytes to the tail\n",
+ this->id (),
+ n));
}
- if (TAO_debug_level > 3)
+ // Update the missing data...
+ missing_data =
+ this->incoming_message_queue_.missing_data_tail ();
+
+ if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
- ACE_TEXT ("read %d bytes\n"),
- this->id (), n));
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "missing [%d] bytes in the tail message\n",
+ this->id (),
+ missing_data));
}
- // Set the write pointer in the stack buffer
- message_block.wr_ptr (n);
-
- //
- // STACK PROCESSING OR MESSAGE CONSOLIDATION
- //
-
- // PRE: data in buffer is aligned && message_block.length() > 0
+ // Move the read pointer of the <incoming> message block to the end
+ // of the copied message and process the remaining portion...
+ incoming.rd_ptr (n);
- if (this->incoming_message_stack_.top (q_data) != -1
- && q_data->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
+ // If we have some more information left in the message block..
+ if (incoming.length ())
{
- //
- // MESSAGE CONSOLIDATION
- //
+ // We may have to parse & consolidate. This part of the message
+ // doesn't seem to be part of the last message in the queue (as
+ // the copy () hasn't taken away this message).
+ const int retval = this->parse_consolidate_messages (incoming,
+ rh,
+ max_wait_time);
- // Partial message on incoming_message_stack_ needs to be
- // consolidated. The message header could not be parsed so far
- // and therefor the message size is unknown yet. Consolidating
- // the message destroys the memory alignment of succeeding
- // messages sharing the buffer, for that reason consolidation
- // and stack based processing are mutial exclusive.
- if (this->messaging_object ()->consolidate_node (q_data,
- message_block) == -1)
+ // If there is an error return
+ if (retval == -1)
{
- if (TAO_debug_level > 0)
+ if (TAO_debug_level)
{
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
- ACE_TEXT ("error consolidating message from input buffer\n"),
- this->id () ));
- }
- return -1;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "error while consolidating, part of the read message\n",
+ this->id ()));
+ }
+ return retval;
}
-
- // Complete message are to be enqueued and later processed
- if (q_data->missing_data_ == 0)
+ else if (retval == 1)
{
- if (this->incoming_message_stack_.pop (q_data) == -1)
- {
- return -1;
- }
+ // If the message in the <incoming> message block has only
+ // one message left we need to process that seperately.
+
+ // Get a queued data
+ TAO_Queued_Data *qd = this->make_queued_data (incoming);
+
+ // Get the rest of the message data
+ this->messaging_object ()->get_message_data (qd);
- if (this->consolidate_enqueue_message (q_data) == -1)
+ // Add the missing data to the queue
+ qd->missing_data_ = 0;
+
+ // Check whether the message was fragmented and try to consolidate
+ // the fragments..
+ if (qd->more_fragments_
+ || (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
{
- return -1;
+ return this->consolidate_fragments (qd, rh);
}
- }
- if (message_block.length () > 0
- && this->handle_input_parse_extra_messages (message_block) == -1)
- {
- return -1;
- }
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
- // In any case try to process the enqueued messages
- if (this->process_queue_head (rh) == -1)
- {
- return -1;
+ // We should surely have a message in queue now. So just
+ // process that.
+ return this->process_queue_head (rh);
}
- }
- else
- {
- //
- // STACK PROCESSING (critical path)
- //
- // Process the first message in buffer on stack
+ // parse_consolidate_messages () would have processed one of the
+ // messages, so we better return as we dont want to starve other
+ // threads.
+ return 0;
+ }
- // (PRE: first message resides in aligned memory) Make a node of
- // the message-block..
+ // If we still have some missing data..
+ if (missing_data > 0)
+ {
+ // Get the last message from the Queue
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_tail ();
- TAO_Queued_Data qd (&message_block,
- this->orb_core_->transport_message_buffer_allocator ());
+ if (TAO_debug_level > 5)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "trying recv, again\n",
+ this->id ()));
+ }
- size_t mesg_length = 0;
+ // Try to do a read again. If we have some luck it would be
+ // great..
+ const ssize_t n = this->recv (qd->msg_block_->wr_ptr (),
+ missing_data,
+ max_wait_time);
- if (this->messaging_object ()->parse_next_message (message_block,
- qd,
- mesg_length) == -1
- || (qd.missing_data_ == 0
- && mesg_length > message_block.length ()) )
+ if (TAO_debug_level > 5)
{
- // extracting message failed
- return -1;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "recv retval [%d]\n",
+ this->id (),
+ n));
}
- // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
- // This prevents seeking rd_ptr behind the wr_ptr
- if (qd.missing_data_ != 0 ||
- qd.more_fragments_ ||
- qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ // Error...
+ if (n < 0)
{
- if (qd.missing_data_ == 0)
- {
- // Dealing with a fragment
- TAO_Queued_Data *nqd =
- TAO_Queued_Data::duplicate (qd);
-
- if (nqd == 0)
- {
- return -1;
- }
+ return n;
+ }
- // mark the end of message in new buffer
- char* end_mark = nqd->msg_block_->rd_ptr ()
- + mesg_length;
- nqd->msg_block_->wr_ptr (end_mark);
+ // If we get a EWOULDBLOCK ie. n==0, we should anyway put the
+ // message in queue before returning..
+ // Move the write pointer
+ qd->msg_block_->wr_ptr (n);
- // move the read pointer forward in old buffer
- message_block.rd_ptr (mesg_length);
+ // Decrement the missing data
+ qd->missing_data_ -= n;
- // enqueue the message
- if (this->consolidate_enqueue_message (nqd) == -1)
- {
- return -1;
- }
+ // Now put the TAO_Queued_Data back in the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
- if (message_block.length () > 0
- && this->handle_input_parse_extra_messages (message_block) == -1)
- {
- return -1;
- }
+ // Any way as we have come this far and are about to return,
+ // just try to process a message if it is there in the queue.
+ if (this->incoming_message_queue_.is_head_complete ())
+ {
+ return this->process_queue_head (rh);
+ }
- // In any case try to process the enqueued messages
- if (this->process_queue_head (rh) == -1)
- {
- return -1;
- }
- }
- else if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED)
- {
- // Incomplete message, must be the last one in buffer
+ return 0;
+ }
- if (qd.missing_data_ != TAO_MISSING_DATA_UNDEFINED &&
- qd.missing_data_ > message_block.space ())
- {
- // Re-Allocate correct size on heap
- if (ACE_CDR::grow (qd.msg_block_,
- message_block.length ()
- + qd.missing_data_) == -1)
- {
- return -1;
- }
- }
+ // Process a message in the head of the queue if we have one..
+ return this->process_queue_head (rh);
+}
- TAO_Queued_Data *nqd =
- TAO_Queued_Data::duplicate (qd);
- if (nqd == 0)
- {
- return -1;
- }
+int
+TAO_Transport::consolidate_extra_messages (ACE_Message_Block
+ &incoming,
+ TAO_Resume_Handle &rh)
+{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n",
+ this->id ()));
+ }
- // move read-pointer to end of buffer
- message_block.rd_ptr (message_block.length());
+ // Pick the tail of the queue
+ TAO_Queued_Data *tail =
+ this->incoming_message_queue_.dequeue_tail ();
- this->incoming_message_stack_.push (nqd);
- }
- }
- else
+ if (tail)
+ {
+ // If we have a node in the tail, checek to see whether it needs
+ // consolidation. If so, just consolidate it.
+ if (this->messaging_object ()->consolidate_node (tail, incoming) == -1)
{
- //
- // critical path
- //
-
- // We cant process the message on stack right now. First we
- // have got to parse extra messages from message_block,
- // putting them into queue. When this is done we can return
- // to process this message, and notifying other threads to
- // process the messages in queue.
-
- char * end_marker = message_block.rd_ptr ()
- + mesg_length;
-
- if (message_block.length () > mesg_length)
- {
- // There are more message in data stream to be parsed.
- // Safe the rd_ptr to restore later.
- char *rd_ptr_stack_mesg = message_block.rd_ptr ();
-
- // Skip parsed message, jump to next message in buffer
- // PRE: mesg_length <= message_block.length ()
- message_block.rd_ptr (mesg_length);
+ return -1;
+ }
- // Extract remaining messages and enqueue them for later
- // heap processing
- if (this->handle_input_parse_extra_messages (message_block) == -1)
- {
- return -1;
- }
+ // .. put the tail back in queue..
+ this->incoming_message_queue_.enqueue_tail (tail);
+ }
- // correct the end_marker
- end_marker = message_block.rd_ptr ();
+ int retval = 1;
- // Restore rd_ptr
- message_block.rd_ptr (rd_ptr_stack_mesg);
- }
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, "
+ "extracting extra messages\n",
+ this->id ()));
+ }
- // The following if-else has been copied from
- // process_queue_head(). While process_queue_head()
- // processes message on heap, here we will process a message
- // on stack.
+ // Extract messages..
+ while (retval == 1)
+ {
+ TAO_Queued_Data *q_data = 0;
- // Now that we have one message on stack to be processed,
- // check whether we have one more message in the queue...
- if (this->incoming_message_queue_.queue_length () > 0)
+ retval =
+ this->messaging_object ()->extract_next_message (incoming,
+ q_data);
+ if (q_data)
+ {
+ // If we have read a framented message then...
+ if (q_data->more_fragments_ ||
+ q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
{
- if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
- ACE_TEXT ("notify reactor\n"),
- this->id ()));
-
- }
-
- const int retval = this->notify_reactor ();
-
- if (retval == 1)
- {
- // Let the class know that it doesn't need to resume the
- // handle..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
- }
- else if (retval < 0)
- return -1;
+ this->consolidate_fragments (q_data, rh);
}
else
{
- // As there are no further messages in queue just resume
- // the handle. Set the flag incase someone had reset the flag..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
- }
-
- // PRE: incoming_message_queue is empty
- if (this->process_parsed_messages (&qd,
- rh) == -1)
- {
- return -1;
+ this->incoming_message_queue_.enqueue_tail (q_data);
}
-
- // move the rd_ptr tp position of end_marker
- message_block.rd_ptr (end_marker);
}
}
- // Now that all cases have been processed, there might be kept some data
- // in buffer that needs to be safed for next "handle_input" invocations.
- if (message_block.length () > 0)
- {
- if (this->partial_message_ == 0)
- {
- this->allocate_partial_message_block ();
- }
-
- if (this->partial_message_ != 0 &&
- this->partial_message_->copy (message_block.rd_ptr (),
- message_block.length ()) == 0)
- {
- message_block.rd_ptr (message_block.length ());
- }
- else
- {
- return -1;
- }
- }
-
- return 0;
-}
+ // In case of error return..
+ if (retval == -1)
+ {
+ return retval;
+ }
+ return this->process_queue_head (rh);
+}
int
TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
TAO_Resume_Handle &rh)
{
- if (TAO_debug_level > 7)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
- ACE_TEXT ("entering (missing data == %d)\n"),
- this->id(), qd->missing_data_));
- }
-
// Get the <message_type> that we have received
const TAO_Pluggable_Message_Type t = qd->msg_type_;
@@ -2077,9 +2153,9 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
- ACE_TEXT ("received CloseConnection message - %m\n"),
- this->id()));
+ "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
+ "received CloseConnection message - %m\n",
+ this->id()));
// Return a "-1" so that the next stage can take care of
// closing connection and the necessary memory management.
@@ -2113,46 +2189,22 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
- ACE_TEXT ("error in process_reply_message - %m\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
+ "error in process_reply_message - %m\n",
+ this->id ()));
return -1;
}
}
- else if (t == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST)
- {
- // The associated request might be incomplpete residing
- // fragmented in messaging object. We must make sure the
- // resources allocated by fragments are released.
-
- if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
- {
- if (TAO_debug_level > 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
- ACE_TEXT ("error processing CancelRequest\n"),
- this->id ()));
- }
- }
-
- // We are not able to cancel requests being processed already;
- // this is declared as optional feature by CORBA, and TAO does
- // not support this currently.
-
- // Just continue processing, CancelRequest does not mean to cut
- // off the connection.
- }
else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
{
- if (TAO_debug_level > 0)
+ if (TAO_debug_level)
{
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
- ACE_TEXT ("received MessageError, closing connection\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
+ "received MessageError, closing connection\n",
+ this->id ()));
}
return -1;
}
@@ -2161,18 +2213,68 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
return 0;
}
+TAO_Queued_Data *
+TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
+{
+ // Get an instance of TAO_Queued_Data
+ TAO_Queued_Data *qd =
+ TAO_Queued_Data::make_queued_data (
+ this->orb_core_->transport_message_buffer_allocator ());
+
+ // Get the flag for the details of the data block...
+ ACE_Message_Block::Message_Flags flg =
+ incoming.self_flags ();
+
+ if (ACE_BIT_DISABLED (flg,
+ ACE_Message_Block::DONT_DELETE))
+ {
+ // Duplicate the data block before putting it in the queue.
+ qd->msg_block_ = ACE_Message_Block::duplicate (&incoming);
+ }
+ else
+ {
+ // As we are in CORBA mode, all the data blocks would be aligned
+ // on an 8 byte boundary. Hence create a data block for more
+ // than the actual length
+ ACE_Data_Block *db =
+ this->orb_core_->create_input_cdr_data_block (incoming.length ()+
+ ACE_CDR::MAX_ALIGNMENT);
+
+ // Get the allocator..
+ ACE_Allocator *alloc =
+ this->orb_core_->input_cdr_msgblock_allocator ();
+
+ // Make message block..
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
+
+ // Duplicate the block..
+ qd->msg_block_ = mb.duplicate ();
+
+ // Align the message block
+ ACE_CDR::mb_align (qd->msg_block_);
+
+ // Copy the data..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ incoming.length ());
+ }
+
+ return qd;
+}
+
int
TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
{
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
- this->id (), this->incoming_message_queue_.queue_length () ));
+ "TAO (%P|%t) - Transport[%d]::process_queue_head\n",
+ this->id ()));
}
- // See if message in queue ...
- if (this->incoming_message_queue_.queue_length () > 0)
+ // See if the message in the head of the queue is complete...
+ if (this->incoming_message_queue_.is_head_complete () > 0)
{
// Get the message on the head of the queue..
TAO_Queued_Data *qd =
@@ -2181,21 +2283,21 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
- ACE_TEXT ("the size of the queue is [%d]\n"),
- this->id (),
- this->incoming_message_queue_.queue_length()));
+ "TAO (%P|%t) - Transport[%d]::process_queue_head, "
+ "the size of the queue is [%d]\n",
+ this->id (),
+ this->incoming_message_queue_.queue_length()));
}
// Now that we have pulled out out one message out of the queue,
// check whether we have one more message in the queue...
- if (this->incoming_message_queue_.queue_length () > 0)
+ if (this->incoming_message_queue_.is_head_complete () > 0)
{
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
- ACE_TEXT ("notify reactor\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::process_queue_head, "
+ "notify reactor\n",
+ this->id ()));
}
@@ -2248,9 +2350,9 @@ TAO_Transport::notify_reactor (void)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
- ACE_TEXT ("notify to Reactor\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::notify_reactor, "
+ "notify to Reactor\n",
+ this->id ()));
}
@@ -2263,9 +2365,9 @@ TAO_Transport::notify_reactor (void)
// @@todo: need to think about what is the action that
// we can take when we get here.
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
- ACE_TEXT ("notify to the reactor failed..\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::notify_reactor, "
+ "notify to the reactor failed..\n",
+ this->id ()));
}
return 1;
@@ -2366,10 +2468,10 @@ TAO_Transport::post_open (size_t id)
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_connect , ")
- ACE_TEXT ("could not register the transport ")
- ACE_TEXT ("in the reactor.\n"),
- this->id ()));
+ "TAO (%P|%t) - Transport[%d]::post_connect , "
+ "could not register the transport "
+ "in the reactor.\n",
+ this->id ()));
return false;
}
@@ -2384,11 +2486,7 @@ TAO_Transport::allocate_partial_message_block (void)
{
// This value must be at least large enough to hold a GIOP message
// header plus a GIOP fragment header
- const size_t partial_message_size =
- this->messaging_object ()->header_length ();
- // + this->messaging_object ()->fragment_header_length ();
- // deprecated, conflicts with not-single_read_opt.
-
+ const size_t partial_message_size = 16;
ACE_NEW (this->partial_message_,
ACE_Message_Block (partial_message_size));
}
@@ -2400,5 +2498,3 @@ TAO_Transport::allocate_partial_message_block (void)
*/
//@@ TAO_TRANSPORT_SPL_METHODS_ADD_HOOK
-
-TAO_END_VERSIONED_NAMESPACE_DECL