summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-07-26 03:23:33 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-07-26 03:23:33 +0000
commitc420f71fadfbdeb9ce2adb58924e07d44ddcec37 (patch)
tree56dbec3a4cfc8636f233bd08f627161096ad5312
parentd1673babd90169fca5fc40a1c37f1b863b284701 (diff)
downloadATCD-c420f71fadfbdeb9ce2adb58924e07d44ddcec37.tar.gz
ChangeLogTag:Wed Jul 25 22:10:21 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a40
-rw-r--r--TAO/tao/Transport.cpp80
-rw-r--r--TAO/tao/Transport.h3
3 files changed, 59 insertions, 64 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 10cd6e58882..123d93661f7 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,39 @@
+Wed Jul 25 22:10:21 2001 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Transport.cpp:
+ * tao/Transport.h: Fixed a subtle problem that seems to have lead
+ to the Muxing tests failing randomly. The problem is something
+ like this
+ - multiple client threads can try to share a connection
+ - because of the above, more than one message are sent on the
+ same connection
+ - if the server is multi-threaded, the messages can be processed
+ concurrently
+ - there may be a possibility of more than two replies coming on
+ the same connection.
+ - one of the client threads can pick up both the replies
+ - one of the replies would be queued up and the first one can be
+ its own
+ - after queueing up the second it would wake up another thread
+ - if the woken up thread does not own the reply, it could just
+ take the reply and try to transfer ownership to the right
+ thread.
+ - before the second thread transfers the reply, teh second
+ thread would have resumed the handler and because of which one
+ of the threads would have gone into the reactor from the LF.
+ - at exactly the same instant the seccond thread will have
+ difficulty in waking up the thread on select () is it is the
+ owner.
+ Fixed this problem by not resuming the handle till we dispatch
+ the reply. We dont buy anything by resuming the handle before
+ dispatching the reply because, the dispatching will not be
+ unbounded. The forces that apply to the server thread, which
+ resumes the handle before making an upcall does not apply to the
+ client threads that reads and processes replies. This fix should
+ ideally fix the Muxing test failure on different paltforms. If
+ it doesnt, it will atleast prevent the race condition outlined
+ above :-)
+
Wed Jul 25 20:33:21 2001 Balachandran Natarajan <bala@cs.wustl.edu>
* examples/Simple/time-date/Makefile.bor:
@@ -11,7 +47,7 @@ Wed Jul 25 12:50:00 2001 Michael Kircher <Michael.Kircher@mchp.siemens.de>
* tao/Strategies/DIOP_Factory.cpp:
Changed the return value of requires_explicit_endpoint () to 0
- and documented that this return code is not reflecting that
+ and documented that this return code is not reflecting that
the endpoints are not cleaned-up but that we disable it by default
because DIOP is only suitable for certain use cases, e.g. it only
supports one-ways.
@@ -28,7 +64,7 @@ Tue Jul 25 01:00:00 2001 Craig Rodrigues <crodrigu@bbn.com>
* orbsvcs/tests/AVVStreams/Asynch_Three_Stage/run_test.pl:
* orbsvcs/tests/AVStreams/Asynch_Three_Stage/input:
- Increase the times that the perl scripts expecting the
+ Increase the times that the perl scripts expecting the
the CORBA processes to run for. Decrease the size
of the Asynch_Three_Stage input file by 4000 lines.
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 09212a06a5b..020affe6c9a 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -906,11 +906,9 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
return this->consolidate_fragments (nqd, rh);
}
- // Resume before starting to process the request..
- rh.resume_handle ();
-
// Process the message
- return this->process_parsed_messages (&qd);
+ return this->process_parsed_messages (&qd,
+ rh);
}
int
@@ -1100,12 +1098,10 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
return this->consolidate_fragments (nqd, rh);
}
- // Resume the handle before processing the request
- rh.resume_handle ();
-
// Now we have a full message in our buffer. Just go ahead and
// process that
- return this->process_parsed_messages (&pqd);
+ return this->process_parsed_messages (&pqd,
+ rh);
}
int
@@ -1297,7 +1293,7 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n"
- "..............extracting extra messages \n",
+ ".............. extracting extra messages \n",
this->id ()));
}
@@ -1334,7 +1330,8 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block
}
int
-TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd)
+TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh)
{
// Get the <message_type> that we have received
TAO_Pluggable_Message_Type t = qd->msg_type_;
@@ -1357,6 +1354,9 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd)
else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST ||
t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST)
{
+ // Let us resume the handle before we go ahead to process the
+ // request. This will open up the handle for othe threads.
+ rh.resume_handle ();
if (this->messaging_object ()->process_request_message (
this,
qd) == -1)
@@ -1392,35 +1392,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd)
result = this->tms ()->dispatch_reply (params);
- // @@ Somehow it seems dangerous to reset the state *after*
- // dispatching the request, what if another threads receives
- // another reply in the same connection?
- // My guess is that it works as follows:
- // - For the exclusive case there can be no such thread.
- // - The the muxed case each thread has its own message_state.
- // I'm pretty sure this comment is right. Could somebody else
- // please look at it and confirm my guess?
-
- // @@ The above comment was found in the older versions of the
- // code. The code was also written in such a way that, when
- // the client thread on a call from handle_input () from the
- // reactor a call would be made on the handle_client_input
- // (). The implementation of handle_client_input () looked so
- // flaky. It used to create a message state upon entry in to
- // the function using the TMS and destroy that on exit. All
- // this was fine _theoretically_ for multiple threads. But
- // the flakiness was originating in the implementation of
- // get_message_state () where we were creating message state
- // only once and dishing it out for every thread till one of
- // them destroy's it. So, it looked broken. That has been
- // changed. Why?. To my knowledge, the reactor does not call
- // handle_input () on two threads at the same time. So, IMHO
- // that defeats the purpose of creating a message state for
- // every thread. This is just my guess. If we run in to
- // problems this place needs to be revisited. If someone else
- // is going to take a look please contact bala@cs.wustl.edu
- // for details on this-- Bala
-
if (result == -1)
{
// Something really critical happened, we will forget about
@@ -1436,26 +1407,13 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd)
return -1;
}
- if (result == 0)
- {
-
- this->messaging_object ()->reset ();
-
- // The reply dispatcher was no longer registered.
- // This can happened when the request/reply
- // times out.
- // To throw away all registered reply handlers is
- // not the right thing, as there might be just one
- // old reply coming in and several valid new ones
- // pending. If we would invoke <connection_closed>
- // we would throw away also the valid ones.
- //return 0;
- }
-
-
- // This is a NOOP for the Exclusive request case, but it actually
- // destroys the stream in the muxed case.
- //this->tms_->destroy_message_state (message_state);
+ // If we have received a reply, we resume after dispatching the
+ // reply. We know that dispatching a reply is bounded and will
+ // not affect the concurrency at any point.
+ // @@ todo: need to think what do we win by doing this
+ // here. Anyway, when the handle_input_i () returns we should be
+ // fine, right?
+ rh.resume_handle ();
}
else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
{
@@ -1585,11 +1543,11 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
// As we are ready to process the last message just resume
// the handle. Set the flag incase someone had reset the flag..
rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
- rh.resume_handle ();
}
// Process the message...
- if (this->process_parsed_messages (qd) == -1)
+ if (this->process_parsed_messages (qd,
+ rh) == -1)
return -1;
// Delete the Queued_Data..
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 6405bbd50f8..93e4ee7d7a4 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -660,7 +660,8 @@ protected:
/// Process the message by sending it to the higher layers of the
/// ORB.
- int process_parsed_messages (TAO_Queued_Data *qd);
+ int process_parsed_messages (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh);
/// Make a queued data from the <incoming> message block
TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming);