diff options
author | johnc <johnc@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-09-01 20:30:50 +0000 |
---|---|---|
committer | johnc <johnc@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-09-01 20:30:50 +0000 |
commit | aef1929af22061e18b9a232cb0bc7a4da490be69 (patch) | |
tree | 023ee3f72f28a3de6b34c2ea658cb884f71852fd | |
parent | 206509905f1f66a618eef734f5ac4060d68a8241 (diff) | |
download | ATCD-aef1929af22061e18b9a232cb0bc7a4da490be69.tar.gz |
ChangeLogTag: Mon Sep 1 20:18:51 UTC 2008 Ciju John <johnc at ociweb dot com>
-rw-r--r-- | TAO/ChangeLog | 12 | ||||
-rw-r--r-- | TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp | 8 | ||||
-rw-r--r-- | TAO/tao/LocateRequest_Invocation.cpp | 17 | ||||
-rw-r--r-- | TAO/tao/Messaging/Asynch_Invocation.cpp | 5 | ||||
-rw-r--r-- | TAO/tao/Synch_Invocation.cpp | 75 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 13 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 7 |
7 files changed, 97 insertions, 40 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 7f0ef83103a..978d90aa3e0 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,15 @@ +Mon Sep 1 20:18:51 UTC 2008 Ciju John <johnc at ociweb dot com> + + * tao/DynamicInterface/DII_Invocation_Adapter.cpp: + * tao/LocateRequest_Invocation.cpp: + * tao/Messaging/Asynch_Invocation.cpp: + * tao/Synch_Invocation.cpp: + * tao/Transport.h: + * tao/Transport.cpp: + A transport in connecting state can potentially be referenced by + multiple concurrent oneway invocations. Transport access needs + to be synchronized during invocation. + Fri Aug 29 19:12:49 UTC 2008 Adam Mitz <mitza@ociweb.com> * tao/Transport_Connector.cpp (wait_for_connection_completion): diff --git a/TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp b/TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp index 86757520254..af0170fa6d4 100644 --- a/TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp +++ b/TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp @@ -124,6 +124,9 @@ namespace TAO throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO); } + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock () + , TAO_INVOKE_FAILURE); + transport->messaging_object ()->out_stream ().reset_byte_order ( request_->_tao_byte_order ()); @@ -133,11 +136,12 @@ namespace TAO this->exception_list_, this->request_); + ace_mon.release(); Invocation_Status status = synch.remote_invocation (max_wait_time); - if (status == TAO_INVOKE_RESTART && - (synch.reply_status () == GIOP::LOCATION_FORWARD || + if (status == TAO_INVOKE_RESTART && + (synch.reply_status () == GIOP::LOCATION_FORWARD || synch.reply_status () == GIOP::LOCATION_FORWARD_PERM)) { CORBA::Boolean const permanent_forward = diff --git a/TAO/tao/LocateRequest_Invocation.cpp b/TAO/tao/LocateRequest_Invocation.cpp index f489e8e52f1..d87c0e10741 100644 --- a/TAO/tao/LocateRequest_Invocation.cpp +++ b/TAO/tao/LocateRequest_Invocation.cpp @@ -59,15 +59,20 @@ namespace TAO TAO_Transport *transport = this->resolver_.transport (); - TAO_OutputCDR &cdr = transport->out_stream (); + Invocation_Status s = TAO_INVOKE_FAILURE; + { + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock () + , TAO_INVOKE_FAILURE); + TAO_OutputCDR &cdr = transport->out_stream (); - if (transport->generate_locate_request (tspec, this->details_, cdr) == -1) - return TAO_INVOKE_FAILURE; + if (transport->generate_locate_request (tspec, this->details_, cdr) == -1) + return TAO_INVOKE_FAILURE; - countdown.update (); + countdown.update (); - Invocation_Status s = - this->send_message (cdr, TAO_Transport::TAO_TWOWAY_REQUEST, max_wait_time); + s = this->send_message (cdr, TAO_Transport::TAO_TWOWAY_REQUEST + , max_wait_time); + } if (s != TAO_INVOKE_SUCCESS) return s; diff --git a/TAO/tao/Messaging/Asynch_Invocation.cpp b/TAO/tao/Messaging/Asynch_Invocation.cpp index cc9df631e1c..6799c351a0d 100644 --- a/TAO/tao/Messaging/Asynch_Invocation.cpp +++ b/TAO/tao/Messaging/Asynch_Invocation.cpp @@ -65,6 +65,9 @@ namespace TAO throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO); } + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock () + , TAO_INVOKE_FAILURE); + TAO_OutputCDR & cdr = this->resolver_.transport ()->messaging_object ()->out_stream (); @@ -111,6 +114,8 @@ namespace TAO TAO_Transport::TAO_ONEWAY_REQUEST, max_wait_time); + ace_mon.release(); + #if TAO_HAS_INTERCEPTORS == 1 // NOTE: We don't need to do the auto_ptr <> trick. We got here // in the first place since the message was sent properly, diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp index b94a200205f..bf6bdd53f41 100644 --- a/TAO/tao/Synch_Invocation.cpp +++ b/TAO/tao/Synch_Invocation.cpp @@ -83,6 +83,9 @@ namespace TAO throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO); } + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock () + , TAO_INVOKE_FAILURE); + TAO_OutputCDR &cdr = transport->out_stream (); cdr.message_attributes (this->details_.request_id (), @@ -95,7 +98,8 @@ namespace TAO this->marshal_data (cdr); - // Register a reply dispatcher for this invocation. Use the + + // Register a reply dispatcher for this invocation. Use the // preallocated reply dispatcher. TAO_Bind_Dispatcher_Guard dispatch_guard ( this->details_.request_id (), @@ -117,6 +121,8 @@ namespace TAO TAO_Transport::TAO_TWOWAY_REQUEST, max_wait_time); + ace_mon.release(); + #if TAO_HAS_INTERCEPTORS == 1 // @@NOTE: Too much code repetition. // If the above call returns a restart due to connection @@ -652,37 +658,42 @@ namespace TAO throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO); } - TAO_OutputCDR &cdr = transport->out_stream (); - - cdr.message_attributes (this->details_.request_id (), - this->resolver_.stub (), - TAO_Transport::TAO_ONEWAY_REQUEST, - max_wait_time, - false); - - this->write_header (cdr); - - this->marshal_data (cdr); - - countdown.update (); - - if (transport->is_connected ()) - { - // We have a connected transport so we can send the message - s = this->send_message (cdr, - TAO_Transport::TAO_ONEWAY_REQUEST, - max_wait_time); - } - else - { - if (TAO_debug_level > 4) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Oneway_Invocation::" - "remote_oneway, queueing message\n")); - - if (transport->format_queue_message (cdr, max_wait_time) != 0) - s = TAO_INVOKE_FAILURE; - } + { + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock () + , TAO_INVOKE_FAILURE); + + TAO_OutputCDR &cdr = transport->out_stream (); + + cdr.message_attributes (this->details_.request_id (), + this->resolver_.stub (), + TAO_Transport::TAO_ONEWAY_REQUEST, + max_wait_time, + false); + + this->write_header (cdr); + + this->marshal_data (cdr); + + countdown.update (); + + if (transport->is_connected ()) + { + // We have a connected transport so we can send the message + s = this->send_message (cdr, + TAO_Transport::TAO_ONEWAY_REQUEST, + max_wait_time); + } + else + { + if (TAO_debug_level > 4) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Synch_Oneway_Invocation::" + "remote_oneway, queueing message\n")); + + if (transport->format_queue_message (cdr, max_wait_time) != 0) + s = TAO_INVOKE_FAILURE; + } + } #if TAO_HAS_INTERCEPTORS == 1 s = this->receive_other_interception (); diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 5256ebea64e..5147b49229f 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -167,7 +167,11 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , stats_ (0) #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */ , flush_in_post_open_ (0) + , output_cdr_lock_ (0) { + ACE_NEW (this->output_cdr_lock_, + ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (this->output_cdr_mutex_)); + ACE_NEW (this->messaging_object_, TAO_GIOP_Message_Base (orb_core, this, @@ -531,6 +535,9 @@ TAO_Transport::handle_output (ACE_Time_Value *max_wait_time) this->id ())); } + // drain_queue ultimates touches the OutputCDR and hance needs to be synchronized. + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->output_cdr_lock_, -1); + // The flushing strategy (potentially via the Reactor) wants to send // more data, first check if there is a current message that needs // more sending... @@ -2634,6 +2641,12 @@ TAO_Transport::out_stream (void) return this->messaging_object ()->out_stream (); } +ACE_Lock* +TAO_Transport::output_cdr_lock (void) +{ + return this->output_cdr_lock_; +} + void TAO_Transport::messaging_init (TAO_GIOP_Message_Version const &version) { diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 120f1b60019..8f09ecf5c00 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -553,6 +553,9 @@ public: /// Accessor for the output CDR stream TAO_OutputCDR &out_stream (void); + /// Accessor for synchronizing Transport OutputCDR access + ACE_Lock* output_cdr_lock (void); + /// Set the flush in post open flag void set_flush_in_post_open (void); @@ -1086,6 +1089,10 @@ private: /// Indicate that flushing needs to be done in post_open() bool flush_in_post_open_; + /// lock for synchronizing Transport OutputCDR access + mutable ACE_Lock *output_cdr_lock_; + mutable TAO_SYNCH_MUTEX output_cdr_mutex_; + /* * specialization hook to add class members from concrete * transport class onto the base transport class. Please |