summaryrefslogtreecommitdiff
path: root/TAO/tao/Strategies/SHMIOP_Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Strategies/SHMIOP_Transport.cpp')
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp233
1 files changed, 149 insertions, 84 deletions
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index 6cd79c5620f..81c803e0a41 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -15,10 +15,9 @@
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
-#include "tao/Resume_Handle.h"
-#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
+#include "tao/GIOP_Message_Lite.h"
+#include "GIOP_Message_NonReactive_Base.h"
#if !defined (__ACE_INLINE__)
# include "SHMIOP_Transport.i"
@@ -44,7 +43,7 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Base (orb_core));
+ TAO_GIOP_Message_NonReactive_Base (orb_core));
}
}
@@ -92,97 +91,42 @@ TAO_SHMIOP_Transport::recv_i (char *buf,
size_t len,
const ACE_Time_Value *max_wait_time)
{
- ssize_t n = 0;
-
- int read_break = 0;
-
- while (!read_break)
- {
- n = this->connection_handler_->peer ().recv (buf,
- len,
- max_wait_time);
-
- // If we get a EWOULBLOCK we try to read again.
- if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
- {
- n = 0;
- continue;
- }
-
- // If there is anything else we just drop out of the loop.
- read_break = 1;
- }
-
- if (n == 0 || n == -1)
- {
- if (TAO_debug_level > 3)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p \n"),
- ACE_TEXT ("TAO - read message failure ")
- ACE_TEXT ("recv_i () \n")));
- }
- }
-
- return n;
-
+ return this->connection_handler_->peer ().recv (buf,
+ len,
+ max_wait_time);
}
-
int
-TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
+TAO_SHMIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
+ int block)
{
- // Calculate the actual length of the load that we are supposed to
- // read which is equal to the <missing_data> + length of the buffer
- // that we have..
- size_t payload = missing_data + incoming.length ();
-
- // Grow the buffer to the size of the message
- ACE_CDR::grow (&incoming,
- payload);
-
- // .. do a read on the socket again.
- ssize_t bytes = 0;
-
- // As this used for transports where things are available in one
- // shot this looping should not create any problems.
- for (size_t n = missing_data;
- n != 0;
- n -= bytes)
+ // Read the message of the socket
+ int result = this->messaging_object_->read_message (this,
+ block,
+ max_wait_time);
+
+ if (result == -1)
{
- // We would have liked to use something like a recv_n ()
- // here. But at the time when the code was written, the MEM_Stream
- // classes had poor support for recv_n (). Till a day when we
- // get proper recv_n (), let us stick with this. The other
- // argument that can be said against this is that, this is the
- // bad layer in which this is being done ie. recv_n is
- // simulated. But...
- bytes = this->recv (incoming.wr_ptr (),
- n,
- max_wait_time);
-
- if (bytes == 0 ||
- bytes == -1)
- {
- return -1;
- }
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("SHMIOP_Transport::read_message, failure in read_message ()")));
- incoming.wr_ptr (bytes);
+ this->tms_->connection_closed ();
+ return -1;
}
+ if (result < 2)
+ return result;
- TAO_Queued_Data pqd (&incoming);
+ // Now we know that we have been able to read the complete message
+ // here.. We loop here to see whether we have read more than one
+ // message in our read.
- // With SHMIOP we would not have any missing data...
- pqd.missing_data_ = 0;
+ // See we use the reactor semantics again
+ result = this->process_message ();
- this->messaging_object ()->get_message_data (&pqd);
- // Now we have a full message in our buffer. Just go ahead and
- // process that
- return this->process_parsed_messages (&pqd, rh);
+ return result;
}
@@ -276,6 +220,127 @@ TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major,
return 1;
}
+int
+TAO_SHMIOP_Transport::process_message (void)
+{
+ // Check whether we have messages for processing
+ int retval =
+ this->messaging_object_->more_messages ();
+
+ // The messages are fragmented, so we go back to the reactor.
+ if (retval <= 0)
+ return retval;
+
+ // Get the <message_type> that we have received
+ TAO_Pluggable_Message_Type t =
+ this->messaging_object_->message_type ();
+
+
+ int result = 0;
+ if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("Close Connection Message recd \n")));
+
+ this->tms_->connection_closed ();
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
+ {
+ if (this->messaging_object_->process_request_message (this,
+ this->orb_core ()) == -1)
+ return -1;
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
+ {
+ TAO_Pluggable_Reply_Params params (this->orb_core ());
+ if (this->messaging_object_->process_reply_message (params) == -1)
+ {
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("SHMIOP_Transport::process_message, process_reply_message ()")));
+
+ this->messaging_object_->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+
+ result =
+ this->tms_->dispatch_reply (params);
+
+ // @@ Somehow it seems dangerous to reset the state *after*
+ // dispatching the request, what if another threads receives
+ // another reply in the same connection?
+ // My guess is that it works as follows:
+ // - For the exclusive case there can be no such thread.
+ // - The the muxed case each thread has its own message_state.
+ // I'm pretty sure this comment is right. Could somebody else
+ // please look at it and confirm my guess?
+
+ // @@ The above comment was found in the older versions of the
+ // code. The code was also written in such a way that, when
+ // the client thread on a call from handle_input () from the
+ // reactor a call would be made on the handle_client_input
+ // (). The implementation of handle_client_input () looked so
+ // flaky. It used to create a message state upon entry in to
+ // the function using the TMS and destroy that on exit. All
+ // this was fine _theoretically_ for multiple threads. But
+ // the flakiness was originating in the implementation of
+ // get_message_state () where we were creating message state
+ // only once and dishing it out for every thread till one of
+ // them destroy's it. So, it looked broken. That has been
+ // changed. Why?. To my knowledge, the reactor does not call
+ // handle_input () on two threads at the same time. So, IMHO
+ // that defeats the purpose of creating a message state for
+ // every thread. This is just my guess. If we run in to
+ // problems this place needs to be revisited. If someone else
+ // is going to take a look please contact bala@cs.wustl.edu
+ // for details on this-- Bala
+
+ if (result == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) : SHMIOP_Transport::")
+ ACE_TEXT ("process_message - ")
+ ACE_TEXT ("dispatch reply failed\n")));
+ this->messaging_object_->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ if (result == 0)
+ {
+ this->messaging_object_->reset ();
+
+ // The reply dispatcher was no longer registered.
+ // This can happened when the request/reply
+ // times out.
+ // To throw away all registered reply handlers is
+ // not the right thing, as there might be just one
+ // old reply coming in and several valid new ones
+ // pending. If we would invoke <connection_closed>
+ // we would throw away also the valid ones.
+ //return 0;
+ }
+
+
+ // This is a NOOP for the Exclusive request case, but it actually
+ // destroys the stream in the muxed case.
+ //this->tms_->destroy_message_state (message_state);
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
void
TAO_SHMIOP_Transport::transition_handler_state_i (void)
{