summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-07-03 20:47:05 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-07-03 20:47:05 +0000
commit838d1b07dd512b7d383084a5e2dda149444f3eb2 (patch)
tree90d077878a3971002d31edfdd5217dd09e19ec66
parent9df8b621720ea9ea68f197ceda6eaff9d980a38d (diff)
downloadATCD-838d1b07dd512b7d383084a5e2dda149444f3eb2.tar.gz
ChangeLogTag:Tue Jul 03 03:45:03 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/tao/Resume_Handle.cpp3
-rw-r--r--TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp28
-rw-r--r--TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h23
-rw-r--r--TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h3
-rw-r--r--TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp37
-rw-r--r--TAO/tao/Strategies/SHMIOP_Connection_Handler.h2
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp237
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.h13
-rw-r--r--TAO/tao/Strategies/TAO_Strategies.dsp16
-rw-r--r--TAO/tao/Transport.h10
10 files changed, 144 insertions, 228 deletions
diff --git a/TAO/tao/Resume_Handle.cpp b/TAO/tao/Resume_Handle.cpp
index d5c9b1e9b24..af94eefa37e 100644
--- a/TAO/tao/Resume_Handle.cpp
+++ b/TAO/tao/Resume_Handle.cpp
@@ -13,7 +13,8 @@ TAO_Resume_Handle::resume_handle (void)
{
// If we have a complete message, just resume the handler
// Resume the handler.
- if (this->orb_core_->reactor ()->resumable_handler () &&
+ if (this->orb_core_ &&
+ this->orb_core_->reactor ()->resumable_handler () &&
this->flag_ == TAO_HANDLE_RESUMABLE &&
this->handle_ != ACE_INVALID_HANDLE)
this->orb_core_->reactor ()->resume_handler (this->handle_);
diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp
index 84f5a560fc2..1c4f044eebd 100644
--- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp
+++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp
@@ -8,10 +8,9 @@
ACE_RCSID (Strategies, GIOP_Message_NonReactive_Base, "$Id$")
-TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core,
- size_t buf_size)
- : TAO_GIOP_Message_Base (orb_core, buf_size),
- message_handler_ (orb_core, this, buf_size)
+TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core)
+
+ : TAO_GIOP_Message_Base (orb_core)
{
}
@@ -19,14 +18,18 @@ TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Co
int
TAO_GIOP_Message_NonReactive_Base::read_message (TAO_Transport *transport,
+ ACE_Message_Block &block,
int /*block*/,
ACE_Time_Value *max_wait_time)
{
// Call the handler to read and do a simple parse of the header of
// the message.
int retval =
- this->message_handler_.read_parse_message (transport,
- max_wait_time);
+ this->read_data (transport,
+ max_wait_time);
+
+ // Before we do this let us reset the
+ char *buf = this->input_cdr_.rd_ptr ();
// Error in the message that was received
@@ -61,6 +64,19 @@ TAO_GIOP_Message_NonReactive_Base::read_message (TAO_Transport *transport,
return 2;
}
+
+size_t
+TAO_GIOP_Message_NonReactive_Base::read_data (TAO_Transport *transport,
+ ACE_Time_Value *time)
+{
+
+ transport->recv (buf,
+ n,
+ max_wait_time);
+
+}
+
+
TAO_Pluggable_Message_Type
TAO_GIOP_Message_NonReactive_Base::message_type (void)
{
diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h
index d6e8b434ecd..c7ddb348117 100644
--- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h
+++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h
@@ -28,21 +28,23 @@ class TAO_Pluggable_Reply_Params;
/**
* @class TAO_GIOP_Message_NonReactive_Base
*
- * @brief Uses the NonReactive handler class for reading messages.
+ * @brief Uses the NonReactive mechanism to read and process
+ * messages.
*
- * This class uses the TAO_GIOP_Message_NonReactive_Handler class to
- * read and parse messages. This class derives from
- * TAO_GIOP_Message_Base. It just redirects most of the functions to
- * the base class but just acts as a sort of place holder for the
- * NonReactive handler class.
+ * Some protocols based on shared memory cannot make use of the
+ * reactor as other protocols based on TCP/IP. This class is a relief
+ * for such protocols. This effectively does the following
+ * - reads the GIOP header out of the transport
+ * - processes the header to determine the length of the message
+ * and other details.
+ * - reads the body of the message from the transport
+ * - passes the data to the base class for making the upcall.
*/
class TAO_Strategies_Export TAO_GIOP_Message_NonReactive_Base :public TAO_GIOP_Message_Base
{
public:
- friend class TAO_GIOP_Message_NonReactive_Handler;
-
/// Constructor
TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core,
size_t cdr_size = ACE_CDR::DEFAULT_BUFSIZE);
@@ -82,9 +84,8 @@ public:
private:
- /// Thr message handler object that does reading and parsing of the
- /// incoming messages
- TAO_GIOP_Message_NonReactive_Handler message_handler_;
+ /// The input cdr stream in which the incoming data is stored.
+ TAO_InputCDR input_cdr_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h b/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h
index 02b93da7d47..a8d9642a1b1 100644
--- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h
+++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h
@@ -77,8 +77,7 @@ private:
/// Our Message base
TAO_GIOP_Message_NonReactive_Base *mesg_base_;
- /// The input cdr stream in which the incoming data is stored.
- TAO_InputCDR input_cdr_;
+
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
index fd7fb9db6e6..6ab063aa2e8 100644
--- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
@@ -10,12 +10,11 @@
#include "tao/ORB.h"
#include "tao/CDR.h"
#include "tao/Messaging_Policy_i.h"
-#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
#include "tao/Server_Strategy_Factory.h"
#include "tao/Base_Transport_Property.h"
#include "tao/Transport_Cache_Manager.h"
#include "SHMIOP_Endpoint.h"
+#include "tao/Resume_Handle.h"
#if !defined (__ACE_INLINE__)
# include "SHMIOP_Connection_Handler.inl"
@@ -251,42 +250,26 @@ TAO_SHMIOP_Connection_Handler::add_transport_to_cache (void)
int
TAO_SHMIOP_Connection_Handler::handle_input (ACE_HANDLE h)
{
- return this->handle_input_i (h);
-}
-
-
-int
-TAO_SHMIOP_Connection_Handler::handle_input_i (ACE_HANDLE,
- ACE_Time_Value *max_wait_time)
-{
+ // Increase the reference count on the upcall that have passed us.
this->pending_upcalls_++;
- // Call the transport read the message
- int result = this->transport ()->read_process_message (max_wait_time);
-
- // Now the message has been read
- if (result == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("SHMIOP_Connection_Handler::read_message \n")));
+ TAO_Resume_Handle resume_handle (this->orb_core (),
+ h);
- }
+ int retval = this->transport ()->handle_input_i (resume_handle);
// The upcall is done. Bump down the reference count
if (--this->pending_upcalls_ <= 0)
- result = -1;
-
- if (result == 0 || result == -1)
- {
- return result;
- }
+ retval = -1;
- return 0;
+ return retval;
}
+
+
+
// ****************************************************************
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h
index cf9c2fe3549..59a0469cdbc 100644
--- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h
+++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h
@@ -104,8 +104,6 @@ protected:
/// ensure that server threads eventually exit.
virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Time_Value *max_wait_time = 0);
private:
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index 81c803e0a41..8e3fd644f43 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -15,9 +15,9 @@
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
+#include "tao/Resume_Handle.h"
+#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
-#include "GIOP_Message_NonReactive_Base.h"
#if !defined (__ACE_INLINE__)
# include "SHMIOP_Transport.i"
@@ -33,6 +33,7 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl
connection_handler_ (handler),
messaging_object_ (0)
{
+#if 0
if (flag)
{
// Use the lite version of the protocol
@@ -40,10 +41,11 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl
TAO_GIOP_Message_Lite (orb_core));
}
else
+#endif /*#if 0 */
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_NonReactive_Base (orb_core));
+ TAO_GIOP_Message_Base (orb_core));
}
}
@@ -91,42 +93,100 @@ TAO_SHMIOP_Transport::recv_i (char *buf,
size_t len,
const ACE_Time_Value *max_wait_time)
{
- return this->connection_handler_->peer ().recv (buf,
- len,
- max_wait_time);
+ ssize_t n = 0;
+
+ int read_break = 0;
+
+ while (!read_break)
+ {
+ n = this->connection_handler_->peer ().recv (buf,
+ len,
+ max_wait_time);
+
+ // If we get a EWOULBLOCK we try to read again.
+ if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+ {
+ n = 0;
+ continue;
+ }
+
+ // If there is anything else we just drop out of the loop.
+ read_break = 1;
+ }
+
+ if (n == 0 || n == -1)
+ {
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure ")
+ ACE_TEXT ("recv_i () \n")));
+ }
+ }
+
+ return n;
+
}
+
int
-TAO_SHMIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
- int block)
+TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
{
- // Read the message of the socket
- int result = this->messaging_object_->read_message (this,
- block,
- max_wait_time);
-
- if (result == -1)
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ size_t payload = missing_data + incoming.length ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
+
+ // .. do a read on the socket again.
+ ssize_t bytes = 0;
+
+ // As this used for transports where things are available in one
+ // shot this looping should not create any problems.
+ for (size_t n = payload;
+ n != 0;
+ n -= bytes)
{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("SHMIOP_Transport::read_message, failure in read_message ()")));
+ // We would have liked to use something like a recv_n ()
+ // here. But at the time when the code was written, the MEM_Stream
+ // classes had poor support for recv_n (). Till a day when we
+ // get proper recv_n (), let us stick with this. The other
+ // argument that can be said against this is that, this is the
+ // bad layer in which this is being done ie. recv_n is
+ // simulated. But...
+ bytes = this->recv (incoming.wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ if (bytes == 0 ||
+ bytes == -1)
+ {
+ return -1;
+ }
- this->tms_->connection_closed ();
- return -1;
+ incoming.wr_ptr (bytes);
}
- if (result < 2)
- return result;
- // Now we know that we have been able to read the complete message
- // here.. We loop here to see whether we have read more than one
- // message in our read.
+ TAO_Queued_Data pqd (&incoming);
- // See we use the reactor semantics again
- result = this->process_message ();
+ // With SHMIOP we would not have any missing data...
+ pqd.missing_data_ = 0;
+ this->messaging_object ()->get_message_data (&pqd);
- return result;
+ // Resume the handle before processing the request
+ rh.resume_handle ();
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (&pqd);
}
@@ -220,127 +280,6 @@ TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major,
return 1;
}
-int
-TAO_SHMIOP_Transport::process_message (void)
-{
- // Check whether we have messages for processing
- int retval =
- this->messaging_object_->more_messages ();
-
- // The messages are fragmented, so we go back to the reactor.
- if (retval <= 0)
- return retval;
-
- // Get the <message_type> that we have received
- TAO_Pluggable_Message_Type t =
- this->messaging_object_->message_type ();
-
-
- int result = 0;
- if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("Close Connection Message recd \n")));
-
- this->tms_->connection_closed ();
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
- {
- if (this->messaging_object_->process_request_message (this,
- this->orb_core ()) == -1)
- return -1;
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
- {
- TAO_Pluggable_Reply_Params params (this->orb_core ());
- if (this->messaging_object_->process_reply_message (params) == -1)
- {
-
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("SHMIOP_Transport::process_message, process_reply_message ()")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
-
- result =
- this->tms_->dispatch_reply (params);
-
- // @@ Somehow it seems dangerous to reset the state *after*
- // dispatching the request, what if another threads receives
- // another reply in the same connection?
- // My guess is that it works as follows:
- // - For the exclusive case there can be no such thread.
- // - The the muxed case each thread has its own message_state.
- // I'm pretty sure this comment is right. Could somebody else
- // please look at it and confirm my guess?
-
- // @@ The above comment was found in the older versions of the
- // code. The code was also written in such a way that, when
- // the client thread on a call from handle_input () from the
- // reactor a call would be made on the handle_client_input
- // (). The implementation of handle_client_input () looked so
- // flaky. It used to create a message state upon entry in to
- // the function using the TMS and destroy that on exit. All
- // this was fine _theoretically_ for multiple threads. But
- // the flakiness was originating in the implementation of
- // get_message_state () where we were creating message state
- // only once and dishing it out for every thread till one of
- // them destroy's it. So, it looked broken. That has been
- // changed. Why?. To my knowledge, the reactor does not call
- // handle_input () on two threads at the same time. So, IMHO
- // that defeats the purpose of creating a message state for
- // every thread. This is just my guess. If we run in to
- // problems this place needs to be revisited. If someone else
- // is going to take a look please contact bala@cs.wustl.edu
- // for details on this-- Bala
-
- if (result == -1)
- {
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) : SHMIOP_Transport::")
- ACE_TEXT ("process_message - ")
- ACE_TEXT ("dispatch reply failed\n")));
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
- if (result == 0)
- {
- this->messaging_object_->reset ();
-
- // The reply dispatcher was no longer registered.
- // This can happened when the request/reply
- // times out.
- // To throw away all registered reply handlers is
- // not the right thing, as there might be just one
- // old reply coming in and several valid new ones
- // pending. If we would invoke <connection_closed>
- // we would throw away also the valid ones.
- //return 0;
- }
-
-
- // This is a NOOP for the Exclusive request case, but it actually
- // destroys the stream in the muxed case.
- //this->tms_->destroy_message_state (message_state);
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- {
- return -1;
- }
-
- return 0;
-}
-
void
TAO_SHMIOP_Transport::transition_handler_state_i (void)
{
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h
index 60812057b5b..61785ef6215 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.h
+++ b/TAO/tao/Strategies/SHMIOP_Transport.h
@@ -78,11 +78,10 @@ protected:
size_t len,
const ACE_Time_Value *s = 0);
- /// Read and process the message from the connection. The processing
- /// of the message is done by delegating the work to the underlying
- /// messaging object
- virtual int read_process_message (ACE_Time_Value *max_time_value = 0,
- int block =0);
+ virtual int consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time);
virtual int register_handler_i (void);
@@ -109,10 +108,6 @@ public:
CORBA::Octet minor);
private:
- /// Process the message that we have read
- int process_message (void);
-
-private:
/// The connection service handler used for accessing lower layer
/// communication protocols.
TAO_SHMIOP_Connection_Handler *connection_handler_;
diff --git a/TAO/tao/Strategies/TAO_Strategies.dsp b/TAO/tao/Strategies/TAO_Strategies.dsp
index 3b836f5bd2e..8056375a565 100644
--- a/TAO/tao/Strategies/TAO_Strategies.dsp
+++ b/TAO/tao/Strategies/TAO_Strategies.dsp
@@ -102,14 +102,6 @@ SOURCE=.\FIFO_Connection_Purging_Strategy.cpp
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_NonReactive_Base.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_NonReactive_Handler.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\LFU_Connection_Purging_Strategy.cpp
# End Source File
# Begin Source File
@@ -198,14 +190,6 @@ SOURCE=.\advanced_resource.h
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_NonReactive_Base.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_NonReactive_Handler.h
-# End Source File
-# Begin Source File
-
SOURCE=.\Reactor_Per_Priority.h
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 98ac3bdb8d1..d683667ee46 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -599,10 +599,10 @@ protected:
int check_message_integrity (ACE_Message_Block &message_block);
- int consolidate_message (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time);
+ virtual int consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time);
int consolidate_message_queue (ACE_Message_Block &incoming,
ssize_t missing_data,
@@ -613,7 +613,7 @@ protected:
TAO_Resume_Handle &rh);
/// @@ Bala: Documentation
- virtual int process_parsed_messages (TAO_Queued_Data *qd);
+ int process_parsed_messages (TAO_Queued_Data *qd);
public:
/// Method for the connection handler to signify that it