diff options
author | bala <balanatarajan@users.noreply.github.com> | 2001-07-26 03:23:33 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2001-07-26 03:23:33 +0000 |
commit | c420f71fadfbdeb9ce2adb58924e07d44ddcec37 (patch) | |
tree | 56dbec3a4cfc8636f233bd08f627161096ad5312 | |
parent | d1673babd90169fca5fc40a1c37f1b863b284701 (diff) | |
download | ATCD-c420f71fadfbdeb9ce2adb58924e07d44ddcec37.tar.gz |
ChangeLogTag:Wed Jul 25 22:10:21 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 40 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 80 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 3 |
3 files changed, 59 insertions, 64 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 10cd6e58882..123d93661f7 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,39 @@ +Wed Jul 25 22:10:21 2001 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Transport.cpp: + * tao/Transport.h: Fixed a subtle problem that seems to have lead + to the Muxing tests failing randomly. The problem is something + like this + - multiple client threads can try to share a connection + - because of the above, more than one message are sent on the + same connection + - if the server is multi-threaded, the messages can be processed + concurrently + - there may be a possibility of more than two replies coming on + the same connection. + - one of the client threads can pick up both the replies + - one of the replies would be queued up and the first one can be + its own + - after queueing up the second it would wake up another thread + - if the woken up thread does not own the reply, it could just + take the reply and try to transfer ownership to the right + thread. + - before the second thread transfers the reply, teh second + thread would have resumed the handler and because of which one + of the threads would have gone into the reactor from the LF. + - at exactly the same instant the seccond thread will have + difficulty in waking up the thread on select () is it is the + owner. + Fixed this problem by not resuming the handle till we dispatch + the reply. We dont buy anything by resuming the handle before + dispatching the reply because, the dispatching will not be + unbounded. The forces that apply to the server thread, which + resumes the handle before making an upcall does not apply to the + client threads that reads and processes replies. This fix should + ideally fix the Muxing test failure on different paltforms. If + it doesnt, it will atleast prevent the race condition outlined + above :-) + Wed Jul 25 20:33:21 2001 Balachandran Natarajan <bala@cs.wustl.edu> * examples/Simple/time-date/Makefile.bor: @@ -11,7 +47,7 @@ Wed Jul 25 12:50:00 2001 Michael Kircher <Michael.Kircher@mchp.siemens.de> * tao/Strategies/DIOP_Factory.cpp: Changed the return value of requires_explicit_endpoint () to 0 - and documented that this return code is not reflecting that + and documented that this return code is not reflecting that the endpoints are not cleaned-up but that we disable it by default because DIOP is only suitable for certain use cases, e.g. it only supports one-ways. @@ -28,7 +64,7 @@ Tue Jul 25 01:00:00 2001 Craig Rodrigues <crodrigu@bbn.com> * orbsvcs/tests/AVVStreams/Asynch_Three_Stage/run_test.pl: * orbsvcs/tests/AVStreams/Asynch_Three_Stage/input: - Increase the times that the perl scripts expecting the + Increase the times that the perl scripts expecting the the CORBA processes to run for. Decrease the size of the Asynch_Three_Stage input file by 4000 lines. diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 09212a06a5b..020affe6c9a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -906,11 +906,9 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, return this->consolidate_fragments (nqd, rh); } - // Resume before starting to process the request.. - rh.resume_handle (); - // Process the message - return this->process_parsed_messages (&qd); + return this->process_parsed_messages (&qd, + rh); } int @@ -1100,12 +1098,10 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming, return this->consolidate_fragments (nqd, rh); } - // Resume the handle before processing the request - rh.resume_handle (); - // Now we have a full message in our buffer. Just go ahead and // process that - return this->process_parsed_messages (&pqd); + return this->process_parsed_messages (&pqd, + rh); } int @@ -1297,7 +1293,7 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n" - "..............extracting extra messages \n", + ".............. extracting extra messages \n", this->id ())); } @@ -1334,7 +1330,8 @@ TAO_Transport::consolidate_extra_messages (ACE_Message_Block } int -TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) +TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh) { // Get the <message_type> that we have received TAO_Pluggable_Message_Type t = qd->msg_type_; @@ -1357,6 +1354,9 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST || t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST) { + // Let us resume the handle before we go ahead to process the + // request. This will open up the handle for othe threads. + rh.resume_handle (); if (this->messaging_object ()->process_request_message ( this, qd) == -1) @@ -1392,35 +1392,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) result = this->tms ()->dispatch_reply (params); - // @@ Somehow it seems dangerous to reset the state *after* - // dispatching the request, what if another threads receives - // another reply in the same connection? - // My guess is that it works as follows: - // - For the exclusive case there can be no such thread. - // - The the muxed case each thread has its own message_state. - // I'm pretty sure this comment is right. Could somebody else - // please look at it and confirm my guess? - - // @@ The above comment was found in the older versions of the - // code. The code was also written in such a way that, when - // the client thread on a call from handle_input () from the - // reactor a call would be made on the handle_client_input - // (). The implementation of handle_client_input () looked so - // flaky. It used to create a message state upon entry in to - // the function using the TMS and destroy that on exit. All - // this was fine _theoretically_ for multiple threads. But - // the flakiness was originating in the implementation of - // get_message_state () where we were creating message state - // only once and dishing it out for every thread till one of - // them destroy's it. So, it looked broken. That has been - // changed. Why?. To my knowledge, the reactor does not call - // handle_input () on two threads at the same time. So, IMHO - // that defeats the purpose of creating a message state for - // every thread. This is just my guess. If we run in to - // problems this place needs to be revisited. If someone else - // is going to take a look please contact bala@cs.wustl.edu - // for details on this-- Bala - if (result == -1) { // Something really critical happened, we will forget about @@ -1436,26 +1407,13 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) return -1; } - if (result == 0) - { - - this->messaging_object ()->reset (); - - // The reply dispatcher was no longer registered. - // This can happened when the request/reply - // times out. - // To throw away all registered reply handlers is - // not the right thing, as there might be just one - // old reply coming in and several valid new ones - // pending. If we would invoke <connection_closed> - // we would throw away also the valid ones. - //return 0; - } - - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - //this->tms_->destroy_message_state (message_state); + // If we have received a reply, we resume after dispatching the + // reply. We know that dispatching a reply is bounded and will + // not affect the concurrency at any point. + // @@ todo: need to think what do we win by doing this + // here. Anyway, when the handle_input_i () returns we should be + // fine, right? + rh.resume_handle (); } else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) { @@ -1585,11 +1543,11 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) // As we are ready to process the last message just resume // the handle. Set the flag incase someone had reset the flag.. rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); - rh.resume_handle (); } // Process the message... - if (this->process_parsed_messages (qd) == -1) + if (this->process_parsed_messages (qd, + rh) == -1) return -1; // Delete the Queued_Data.. diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 6405bbd50f8..93e4ee7d7a4 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -660,7 +660,8 @@ protected: /// Process the message by sending it to the higher layers of the /// ORB. - int process_parsed_messages (TAO_Queued_Data *qd); + int process_parsed_messages (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh); /// Make a queued data from the <incoming> message block TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming); |