summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-05-30 04:01:04 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-05-30 04:01:04 +0000
commit1323714cde8a8dbf37f829315e0f45146a8408dd (patch)
treec3ce1aeee837b3eafcfbf99a644b60330a961945
parentef7dab8ccd69d8506770ce5428de3d2c3b540bd3 (diff)
downloadATCD-1323714cde8a8dbf37f829315e0f45146a8408dd.tar.gz
ChangeLogTag:Sat May 29 22:49:10 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
-rw-r--r--TAO/ChangeLog-99c43
-rw-r--r--TAO/tao/Connect.cpp50
-rw-r--r--TAO/tao/IIOP_Transport.cpp38
-rw-r--r--TAO/tao/IIOP_Transport.h3
-rw-r--r--TAO/tao/Invocation.cpp19
-rw-r--r--TAO/tao/ORB_Core.cpp4
-rw-r--r--TAO/tao/Pluggable.cpp12
-rw-r--r--TAO/tao/Pluggable.h52
-rw-r--r--TAO/tao/Request_Mux_Strategy.cpp4
-rw-r--r--TAO/tao/Request_Mux_Strategy.h6
-rw-r--r--TAO/tao/Wait_Strategy.cpp183
-rw-r--r--TAO/tao/Wait_Strategy.h22
12 files changed, 308 insertions, 128 deletions
diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c
index d733b4ab15e..b1ce73cd857 100644
--- a/TAO/ChangeLog-99c
+++ b/TAO/ChangeLog-99c
@@ -1,3 +1,46 @@
+Sat May 29 22:49:10 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
+
+ * tao/Wait_Strategy.h:
+ * tao/Wait_Strategy.cpp:
+ The Wait_On_Reactor strategy was not passing up errors to the
+ user.
+ Don't hold the leader-follower lock while sending the request.
+ Don't forget to reset reply_received when sending a request.
+ Added a *lot* of debugging messages trying to fix the LF
+ strategy, they should go away on the production version.
+ New helper method for handle_input() and handle_close(), it is
+ used to decide if we should wake up any sleeping threads.
+ Removed obsolete @@ comments.
+
+ * tao/Connect.cpp:
+ Re-organized some of the debugging code so it will be easier to
+ remove/compile out.
+ Inform the Transport when the connection closes, for example.
+
+ * tao/Pluggable.h:
+ * tao/Pluggable.cpp:
+ * tao/IIOP_Transport.h:
+ * tao/IIOP_Transport.cpp:
+ New handle_close() method used to detect a connection going
+ down, the transport can use that to raise exceptions on all
+ waiting threads (or Reply_Handlers).
+ More debugging messages.
+ Changed signature of destroy_cdr_stream()
+
+ * tao/Invocation.cpp:
+ If send() fails then cleanup the transport ASAP.
+ Bind the request id, dispatcher and CDR stream *before* sending
+ the request.
+ If send() fails report back to the user.
+
+ * tao/ORB_Core.cpp:
+ The leader-follower lock is held by the user during the new
+ leader election, otherwise some nasty race conditions creep in.
+
+ * tao/Request_Mux_Strategy.h:
+ * tao/Request_Mux_Strategy.cpp:
+ Changed signature of destroy_cdr_stream()
+
Sat May 29 22:35:38 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
* tests/NestedUpcall/Reactor/Makefile:
diff --git a/TAO/tao/Connect.cpp b/TAO/tao/Connect.cpp
index aac5e5382b6..0646f05b37b 100644
--- a/TAO/tao/Connect.cpp
+++ b/TAO/tao/Connect.cpp
@@ -107,13 +107,6 @@ TAO_Server_Connection_Handler::transport (void)
int
TAO_Server_Connection_Handler::open (void*)
{
- // Called by the <Strategy_Acceptor> when the handler is completely
- // connected.
- ACE_INET_Addr addr;
-
- if (this->peer ().get_remote_addr (addr) == -1)
- return -1;
-
#if !defined (ACE_LACKS_SOCKET_BUFSIZ)
int sndbufsize =
this->orb_core_->orb_params ()->sock_sndbuf_size ();
@@ -148,15 +141,21 @@ TAO_Server_Connection_Handler::open (void*)
// operation fails we are out of luck (some platforms do not support
// it and return -1).
- char client[MAXHOSTNAMELEN + 1];
+ // Called by the <Strategy_Acceptor> when the handler is completely
+ // connected.
+ ACE_INET_Addr addr;
- if (addr.get_host_name (client, MAXHOSTNAMELEN) == -1)
- addr.addr_to_string (client, sizeof (client));
+ if (this->peer ().get_remote_addr (addr) == -1)
+ return -1;
- if (TAO_orbdebug)
+ char client[MAXHOSTNAMELEN + 16];
+
+ (void) addr.addr_to_string (client, sizeof (client));
+
+ if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) connection from client %s\n",
- client));
+ "(%P|%t) connection from client <%s> on %d\n",
+ client, this->peer ().get_handle ()));
return 0;
}
@@ -883,6 +882,22 @@ TAO_Client_Connection_Handler::open (void *)
// operation fails we are out of luck (some platforms do not support
// it and return -1).
+ // Called by the <Strategy_Acceptor> when the handler is completely
+ // connected.
+ ACE_INET_Addr addr;
+
+ if (this->peer ().get_remote_addr (addr) == -1)
+ return -1;
+
+ char server[MAXHOSTNAMELEN + 16];
+
+ (void) addr.addr_to_string (server, sizeof (server));
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) connection to server <%s> on %d\n",
+ server, this->peer ().get_handle ()));
+
// Register the handler with the Reactor if necessary.
return this->transport ()->wait_strategy ()->register_handler ();
}
@@ -905,11 +920,10 @@ TAO_Client_Connection_Handler::handle_close (ACE_HANDLE handle,
// in turn take appropiate action (such as sending exceptions to
// all waiting reply handlers).
- if (TAO_orbdebug)
+ if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) TAO_Client_Connection_Handler::handle_close (%d, %d)\n",
- handle,
- rm));
+ "(%P|%t) TAO_Client_Connection_Handler::"
+ "handle_close (%d, %d)\n", handle, rm));
if (this->recycler ())
this->recycler ()->mark_as_closed (this->recycling_act ());
@@ -929,6 +943,8 @@ TAO_Client_Connection_Handler::handle_close (ACE_HANDLE handle,
this->peer ().close ();
+ this->transport ()->handle_close ();
+
return 0;
}
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index b76c4bdeeee..abdb9918af2 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -245,10 +245,18 @@ TAO_IIOP_Client_Transport::handle_client_input (int block)
case TAO_GIOP::MessageError:
// Handle errors like these.
// @@ this->reply_handler_->error ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) %N:%l handle_client_input: "
+ "error on stream.\n"),
+ -1);
return -1;
case TAO_GIOP::Fragment:
// Handle this.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) %N:%l handle_client_input: "
+ "fragment.\n"),
+ -1);
return -1;
case TAO_GIOP::Request:
@@ -257,6 +265,10 @@ TAO_IIOP_Client_Transport::handle_client_input (int block)
// on the firt iteration, leave it for the nearby future...
// ERROR too.
// @@ this->reply_handler_->error ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) %N:%l handle_client_input: "
+ "request.\n"),
+ -1);
return -1;
case TAO_GIOP::CancelRequest:
@@ -264,6 +276,10 @@ TAO_IIOP_Client_Transport::handle_client_input (int block)
case TAO_GIOP::CloseConnection:
// @@ Errors for the time being.
// @@ this->reply_handler_->error ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) %N:%l handle_client_input: "
+ "wrong message.\n"),
+ -1);
return -1;
case TAO_GIOP::LocateReply:
@@ -324,11 +340,16 @@ TAO_IIOP_Client_Transport::handle_client_input (int block)
// Handle the reply.
if (reply_dispatcher->dispatch_reply () == -1)
- return -1;
-
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) %N:%l handle_client_input: "
+ "dispatch reply.\n"),
+ -1);
+ return -1;
+ }
// This is a NOOP for the Exclusive request case, but it actually
// destroys the stream in the muxed case.
- this->rms_->destroy_cdr_stream ();
+ this->destroy_cdr_stream (cdr);
// Return something to indicate the reply is received.
return 1;
@@ -360,6 +381,14 @@ TAO_IIOP_Client_Transport::resume_handler (void)
}
int
+TAO_IIOP_Client_Transport::handle_close (void)
+{
+ this->wait_strategy ()->handle_close ();
+ // @@ Should we? : this->rms_->handle_close ();
+ return 0;
+}
+
+int
TAO_IIOP_Client_Transport::check_unexpected_data (void)
{
// @@ Alex: This should *not* be part of the client connection
@@ -386,6 +415,9 @@ TAO_IIOP_Client_Transport::check_unexpected_data (void)
// Both will result in us returning -1 and this connection
// getting closed
//
+ // if (errno == EWOULDBLOCK)
+ // return 0;
+
if (TAO_debug_level)
ACE_DEBUG ((LM_WARNING,
"TAO_IIOP_Client_Transport::check_unexpected_data: "
diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h
index 210cffdb6ba..01bafcec28a 100644
--- a/TAO/tao/IIOP_Transport.h
+++ b/TAO/tao/IIOP_Transport.h
@@ -161,6 +161,9 @@ public:
// Resume the handler from the reactor. This will be called by the
// Wait Strategy if Reactor is used for that strategy.
+ virtual int handle_close (void);
+ // The connection was closed, let everybody know about it....
+
protected:
int check_unexpected_data (void);
// This method checks for unexpected data.
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index f279997ea0e..0544cffce70 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -393,6 +393,8 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip,
{
// send_request () closed the connection, we just have to forget
// about the hint.
+ this->transport_ = 0;
+
this->profile_->reset_hint ();
return TAO_INVOKE_RESTART;
@@ -683,10 +685,6 @@ int
TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- // Just send the request, without trying to wait for the reply.
- int retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV);
- ACE_CHECK_RETURN (retval);
-
// Give the CDR stream for reading the input.
this->transport_->input_cdr_stream (&this->inp_stream_);
@@ -697,17 +695,24 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV)
this->rd_.request_id (this->request_id_);
// Bind.
- retval = this->transport_->bind_reply_dispatcher (this->request_id_,
- &this->rd_);
+ int retval = this->transport_->bind_reply_dispatcher (this->request_id_,
+ &this->rd_);
if (retval == -1)
{
// @@ What is the right way to handle this error?
this->close_connection ();
ACE_THROW_RETURN (CORBA::INTERNAL (TAO_DEFAULT_MINOR_CODE,
- CORBA::COMPLETED_MAYBE),
+ CORBA::COMPLETED_NO),
TAO_INVOKE_EXCEPTION);
}
+ // Just send the request, without trying to wait for the reply.
+ retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV);
+ ACE_CHECK_RETURN (retval);
+
+ if (retval != TAO_INVOKE_OK)
+ return retval;
+
// This blocks until the response is read. In the current version,
// there is only one client thread that ever uses this connection,
// so most response messages are illegal.
diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp
index 0b6985da2dc..d7e34d93ce1 100644
--- a/TAO/tao/ORB_Core.cpp
+++ b/TAO/tao/ORB_Core.cpp
@@ -999,8 +999,8 @@ int
TAO_ORB_Core::unset_leader_wake_up_follower (void)
// sets the leader_available flag to false and tries to wake up a follower
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon,
- this->leader_follower_lock (), -1);
+ // ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon,
+ // this->leader_follower_lock (), -1);
this->unset_leader_thread ();
diff --git a/TAO/tao/Pluggable.cpp b/TAO/tao/Pluggable.cpp
index 1286009e1d1..287f1d678b8 100644
--- a/TAO/tao/Pluggable.cpp
+++ b/TAO/tao/Pluggable.cpp
@@ -63,6 +63,12 @@ TAO_Transport::input_cdr_stream (void) const
return this->rms_->get_cdr_stream ();
}
+void
+TAO_Transport::destroy_cdr_stream (TAO_InputCDR *cdr) const
+{
+ this->rms_->destroy_cdr_stream (cdr);
+}
+
// Set the total size of the incoming message. (This does not
// include the header size).
void
@@ -186,6 +192,12 @@ TAO_Transport::resume_handler (void)
}
int
+TAO_Transport::handle_close (void)
+{
+ ACE_NOTSUP_RETURN (-1);
+}
+
+int
TAO_Transport::wait_for_reply (void)
{
return this->ws_->wait ();
diff --git a/TAO/tao/Pluggable.h b/TAO/tao/Pluggable.h
index 278d86bbcbc..b99bba7616a 100644
--- a/TAO/tao/Pluggable.h
+++ b/TAO/tao/Pluggable.h
@@ -147,30 +147,33 @@ public:
void input_cdr_stream (TAO_InputCDR *cdr);
// Set the CDR stream for reading the input message.
-
+
TAO_InputCDR *input_cdr_stream (void) const;
// Get the CDR stream for reading the input message.
+ void destroy_cdr_stream (TAO_InputCDR *) const;
+ // Release a CDR stream, simply pass it to the RMS...
+
// = State of the incoming message.
-
+
void message_size (CORBA::ULong message_size);
// Set the total size of the incoming message. (This does not
// include the header size). This inits the <message_offset> setting
- // it to zero.
-
+ // it to zero.
+
CORBA::ULong message_size (void) const;
// Get the total size of the incoming message.
-
+
CORBA::ULong message_offset (void) const;
// Get the current offset of the incoming message.
-
+
int incr_message_offset (CORBA::Long bytes_transferred);
// Update the offset of the incoming message. Returns 0 on success
// -1 on failure.
void message_received (int received);
// Set the flag to indicate whether the input message was read fully
- // or no.
+ // or no.
int message_received (void) const;
// Get the flag.
@@ -179,7 +182,7 @@ public:
// void orb_core (TAO_ORB_Core *orb_core);
// Set it.
-
+
TAO_ORB_Core *orb_core (void) const;
// Get it.
@@ -187,20 +190,20 @@ public:
// void rms (TAO_Request_Mux_Strategy *rms);
// Set the RMS object.
-
+
TAO_Request_Mux_Strategy * rms (void) const;
// Get the RMS used by this Transport object.
-
+
TAO_Wait_Strategy *wait_strategy (void) const;
// Return the Wait strategy used by the Transport.
CORBA::ULong request_id (void);
- // Get request id for the current invocation from the RMS object.
-
+ // Get request id for the current invocation from the RMS object.
+
int bind_reply_dispatcher (CORBA::ULong request_id,
TAO_Reply_Dispatcher *rd);
// Bind the reply dispatcher with the RMS object.
-
+
virtual int wait_for_reply (void);
// Wait for the reply depending on the strategy.
@@ -210,7 +213,7 @@ public:
// Read and handle the reply. Returns 0 when there is Short Read on
// the connection. Returns 1 when the full reply is read and
// handled. Returns -1 on errors.
- // If <block> is 1, then reply is read in a blocking manner.
+ // If <block> is 1, then reply is read in a blocking manner.
virtual int register_handler (void);
// Register the handler with the reactor. Will be called by the Wait
@@ -218,40 +221,43 @@ public:
// implementation out here returns -1 setting <errno> to ENOTSUP.
virtual int suspend_handler (void);
- // Suspend the handler from the reactor. Will be called by the Wait
+ // Suspend the handler from the reactor. Will be called by the Wait
// Strategy if Reactor is used for that strategy. Default
// implementation out here returns -1 setting <errno> to ENOTSUP.
virtual int resume_handler (void);
// Resume the handler from the reactor. This will be called by the
// Wait Strategies, if Reactor is used in the strategy. Default
- // implementation out here returns -1 setting <errno> to ENOTSUP.
+ // implementation out here returns -1 setting <errno> to ENOTSUP.
+
+ virtual int handle_close (void);
+ // The connection was closed, let everybody know about it....
protected:
// = States for the input message.
-
+
CORBA::ULong message_size_;
// Total length of the whole message. This does not include the
// header length.
-
+
CORBA::ULong message_offset_;
// Current offset of the input message.
int message_received_;
// Flag to indicate whether the input message has been received
// fully or not.
-
-
+
+
TAO_Request_Mux_Strategy *rms_;
// Strategy to decide whether multiple requests can be sent over the
// same connection or the connection is exclusive for a request.
-
+
TAO_Wait_Strategy *ws_;
- // Strategy for waiting for the reply after sending the request.
+ // Strategy for waiting for the reply after sending the request.
TAO_ORB_Core *orb_core_;
// Global orbcore resource.
-
+
TAO_GIOP_Version version_;
// Version information found in the incoming message.
};
diff --git a/TAO/tao/Request_Mux_Strategy.cpp b/TAO/tao/Request_Mux_Strategy.cpp
index d7836021186..4cd55076f00 100644
--- a/TAO/tao/Request_Mux_Strategy.cpp
+++ b/TAO/tao/Request_Mux_Strategy.cpp
@@ -70,7 +70,7 @@ TAO_Muxed_RMS::set_cdr_stream (TAO_InputCDR *Cdr)
void
-TAO_Muxed_RMS::destroy_cdr_stream (void)
+TAO_Muxed_RMS::destroy_cdr_stream (TAO_InputCDR *)
{
// @@ Implement.
// delete cdr;
@@ -132,7 +132,7 @@ TAO_Exclusive_RMS::set_cdr_stream (TAO_InputCDR *cdr)
// NOOP function.
void
-TAO_Exclusive_RMS::destroy_cdr_stream (void)
+TAO_Exclusive_RMS::destroy_cdr_stream (TAO_InputCDR *)
{
this->cdr_ = 0;
}
diff --git a/TAO/tao/Request_Mux_Strategy.h b/TAO/tao/Request_Mux_Strategy.h
index 363b6539873..900f63a300e 100644
--- a/TAO/tao/Request_Mux_Strategy.h
+++ b/TAO/tao/Request_Mux_Strategy.h
@@ -69,7 +69,7 @@ public:
virtual TAO_InputCDR *get_cdr_stream (void);
// Get the CDR stream.
- virtual void destroy_cdr_stream (void) = 0;
+ virtual void destroy_cdr_stream (TAO_InputCDR *) = 0;
// Destroy the CDR stream.
protected:
@@ -115,7 +115,7 @@ public:
// virtual TAO_InputCDR *cdr_stream (void);
// Get the CDR stream.
- virtual void destroy_cdr_stream (void);
+ virtual void destroy_cdr_stream (TAO_InputCDR *);
// Delete the cdr stream.
protected:
@@ -159,7 +159,7 @@ public:
// virtual TAO_InputCDR *cdr_stream (void);
// Get the CDR stream.
- virtual void destroy_cdr_stream (void);
+ virtual void destroy_cdr_stream (TAO_InputCDR *);
// NO-OP function.
protected:
diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp
index 93b34738f1d..f7f5e19ac58 100644
--- a/TAO/tao/Wait_Strategy.cpp
+++ b/TAO/tao/Wait_Strategy.cpp
@@ -3,6 +3,7 @@
#include "tao/Wait_Strategy.h"
#include "tao/Pluggable.h"
#include "tao/ORB_Core.h"
+#include "tao/debug.h"
ACE_RCSID(tao, Wait_Strategy, "$Id$")
@@ -72,7 +73,7 @@ TAO_Wait_On_Reactor::wait (void)
result = reactor->handle_events (/* timeout */);
}
- if (result == -1)
+ if (result == -1 || this->reply_received_ == -1)
return -1;
return 0;
@@ -83,12 +84,19 @@ TAO_Wait_On_Reactor::handle_input (void)
{
int result = this->transport_->handle_client_input (0);
+ if (result == 1)
+ this->reply_received_ = 1;
+
if (result == -1)
return -1;
- if (result == 1)
- this->reply_received_ = 1;
+ return 0;
+}
+int
+TAO_Wait_On_Reactor::handle_close (void)
+{
+ this->reply_received_ = -1;
return 0;
}
@@ -139,33 +147,48 @@ TAO_Wait_On_Leader_Follower::send_request (TAO_ORB_Core *orb_core,
stream,
two_way);
}
- else
- {
- // = Two way call.
- // @@ Should we do here that checking for the difference in the
- // Reactor used??? (Alex).
+ // = Two way call.
- // Register the handler.
- this->transport_->register_handler ();
- // @@ Carlos: We do this only if the reactor is different right?
- // (Alex)
+ // @@ Should we do here that checking for the difference in the
+ // Reactor used??? (Alex).
- // Obtain the lock.
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon,
- orb_core->leader_follower_lock (), -1);
+ // Register the handler.
+ this->transport_->register_handler ();
+ // @@ Carlos: We do this only if the reactor is different right?
+ // (Alex)
+ // @@ Alex: that is taken care of in
+ // IIOP_Transport::register_handler, but maybe we shouldn't do this
+ // checking everytime, I recall that there was a problem (sometime
+ // ago) about using the wrong ORB core, but that may have been
+ // fixed...
- // Set the state so that we know we're looking for a response.
- this->expecting_response_ = 1;
+ // Obtain the lock.
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon,
+ orb_core->leader_follower_lock (), -1);
- // remember in which thread the client connection handler was running
- this->calling_thread_ = ACE_Thread::self ();
+ // The last request may have left this unitialized
+ this->reply_received_ = 0;
- // Send the request
- return TAO_Wait_Strategy::send_request (orb_core,
- stream,
- two_way);
- }
+ // Set the state so that we know we're looking for a response.
+ this->expecting_response_ = 1;
+
+ // remember in which thread the client connection handler was running
+ this->calling_thread_ = ACE_Thread::self ();
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Wait_On_LF::send_request "
+ "expecting reply for <%x:%d>\n",
+ this, this->transport_->handle ()));
+
+ }
+
+ // Send the request
+ return TAO_Wait_Strategy::send_request (orb_core,
+ stream,
+ two_way);
}
int
@@ -223,7 +246,8 @@ TAO_Wait_On_Leader_Follower::wait (void)
if (orb_core->add_follower (cond) == -1)
ACE_ERROR ((LM_ERROR,
"TAO:%N:%l:(%P|%t):TAO_Wait_On_Leader_Follower::wait: "
- "Failed to add a follower thread\n"));
+ "Failed to add a follower <%x>\n",
+ cond));
while (!this->reply_received_ && orb_core->leader_available ())
{
@@ -269,6 +293,9 @@ TAO_Wait_On_Leader_Follower::wait (void)
// This might increase the refcount of the leader.
orb_core->set_leader_thread ();
+ // ACE_DEBUG ((LM_DEBUG,
+ // "TAO (%P|%t) - become the leader\n"));
+
// Release the lock.
if (ace_mon.release () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -286,12 +313,22 @@ TAO_Wait_On_Leader_Follower::wait (void)
while (result >= 0 && this->reply_received_ == 0)
result = orb_core->reactor ()->handle_events ();
+ // Re-acquire the lock.
+ if (ace_mon.acquire () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO:%N:%l:(%P|%t): TAO_Wait_On_Leader_Follower::wait: "
+ "Failed to acquire the lock.\n"),
+ -1);
+
// Wake up the next leader, we cannot do that in handle_input,
// because the woken up thread would try to get into
// handle_events, which is at the time in handle_input still
// occupied. But do it before checking the error in <result>, even
// if there is an error in our input we should continue running the
- // loop in anothe rthread.
+ // loop in another thread.
+
+ // ACE_DEBUG ((LM_DEBUG,
+ // "TAO (%P|%t) - elect a follower\n"));
if (orb_core->unset_leader_wake_up_follower () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -337,10 +374,21 @@ TAO_Wait_On_Leader_Follower::handle_input (void)
// @@ Alex: this could be a CloseConnection message or something
// similar, has to be handled...
if (!this->expecting_response_)
- return -1;
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Wait_On_LF::handle_input, "
+ "unexpected <%x:%d>\n",
+ this, this->transport_->handle ()));
+ return -1;
+ }
// Receive any data that is available, without blocking...
int result = this->transport_->handle_client_input (0);
+ if (result == -1 && TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Wait_On_LF::handle_input, "
+ "handle_client_input == -1\n"));
// Data was read, but there the reply has not been completely
// received...
@@ -348,50 +396,28 @@ TAO_Wait_On_Leader_Follower::handle_input (void)
return 0;
// Severe error, abort....
- if (result == -1)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - L-F error while waiting on %d\n",
- this->transport_->handle ()));
-
- this->reply_received_ = -1;
- }
- else
+ if (result == 1)
{
- // All the data is here!
this->reply_received_ = 1;
result = 0;
}
- if (ACE_OS::thr_equal (this->calling_thread_, ACE_Thread::self ()))
- {
- // We are the leader thread, simply return 0 to terminate the
- // event loop....
- return 0;
- }
-
- // We are not the leader thread, but we have our data, wake up
- // ourselves and then return 0 so the leader thread can continue
- // doing its job....
-
- // At this point we might fail to remove the follower, because
- // it has been already chosen to become the leader, so it is
- // awake and will get this too.
- ACE_SYNCH_CONDITION* cond =
- this->cond_response_available ();
-
- // Ignore any errors, may have been removed by another thread...
- (void) orb_core->remove_follower (cond);
-
- if (cond == 0 || cond->signal () == -1)
- {
- // Yikes, what do we do here????
- return result;
- }
+ this->wake_up ();
return result;
}
+int
+TAO_Wait_On_Leader_Follower::handle_close (void)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon,
+ this->transport_->orb_core ()->leader_follower_lock (),
+ -1);
+ this->reply_received_ = -1;
+ this->wake_up ();
+ return 0;
+}
+
// Register the handler.
int
TAO_Wait_On_Leader_Follower::register_handler (void)
@@ -421,6 +447,33 @@ TAO_Wait_On_Leader_Follower::cond_response_available (void)
return this->cond_response_available_;
}
+void
+TAO_Wait_On_Leader_Follower::wake_up (void)
+{
+ if (ACE_OS::thr_equal (this->calling_thread_, ACE_Thread::self ()))
+ {
+ // We are the leader thread, simply return 0, handle_events()
+ // will return because there was at least one event (this one!)
+ return;
+ }
+
+ // We are not the leader thread, but we have our data, wake up
+ // ourselves and then return 0 so the leader thread can continue
+ // doing its job....
+
+ // At this point we might fail to remove the follower, because
+ // it has been already chosen to become the leader, so it is
+ // awake and will get this too.
+ ACE_SYNCH_CONDITION* cond =
+ this->cond_response_available ();
+
+ // Ignore any errors, may have been removed by another thread...
+ (void) this->transport_->orb_core ()->remove_follower (cond);
+
+ if (cond != 0)
+ (void) cond->signal ();
+}
+
// *********************************************************************
// Constructor.
@@ -459,6 +512,12 @@ TAO_Wait_On_Read::handle_input (void)
return this->transport_->handle_client_input (1);
}
+int
+TAO_Wait_On_Read::handle_close (void)
+{
+ return 0;
+}
+
// No-op.
int
TAO_Wait_On_Read::register_handler (void)
diff --git a/TAO/tao/Wait_Strategy.h b/TAO/tao/Wait_Strategy.h
index 14e02c4edde..a842ac536a7 100644
--- a/TAO/tao/Wait_Strategy.h
+++ b/TAO/tao/Wait_Strategy.h
@@ -59,9 +59,8 @@ public:
virtual int handle_input (void) = 0;
// Handle the input.
- // @@ Alex: this class should *not* depend on the IIOP_Handlers,
- // can't you use TAO_Transport for this? After all it returns an
- // Event_Handler if you need one...
+ virtual int handle_close (void) = 0;
+ // The connection was closed, take appropiate action...
virtual int register_handler (void) = 0;
// Register the handler with the Reactor if it makes sense for the
@@ -103,9 +102,8 @@ public:
// Handle the input. Delegate this job to Transport object. Before
// that suspend the handler in the Reactor.
- // @@ Alex: this class should *not* depend on the IIOP_Handlers,
- // can't you use TAO_Transport for this? After all it returns an
- // Event_Handler if you need one...
+ virtual int handle_close (void);
+ // The connection was closed, take appropiate action...
virtual int register_handler (void);
// Register the handler with the Reactor.
@@ -115,7 +113,7 @@ public:
private:
int reply_received_;
- // This flag indicates if a *complete* reply has been received. Used
+ // This flag indicates if a *complete* reply has been received. Used
// to exit the event loop.
};
@@ -149,7 +147,8 @@ public:
// Handle the input. Delegate this job to Transport object. Before
// that, suspend the handler in the Reactor.
- // @@ Alex: another use of IIOP_Handler...
+ virtual int handle_close (void);
+ // The connection was closed, take appropiate action...
virtual int register_handler (void);
// Register the handler with the Reactor.
@@ -161,6 +160,10 @@ protected:
ACE_SYNCH_CONDITION* cond_response_available (void);
// Return the cond_response_available, initializing it if necessary.
+ void wake_up (void);
+ // Helper method to wake us up when we are a follower...
+
+protected:
ACE_thread_t calling_thread_;
// the thread ID of the thread we were running in.
@@ -200,7 +203,8 @@ public:
virtual int handle_input (void);
// Handle the input. Delegate this job to Transport object.
- // @@ Alex: another use of IIOP_Handler...
+ virtual int handle_close (void);
+ // The connection was closed, take appropiate action...
virtual int register_handler (void);
// No-op. Return 0.