summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjohnc <johnc@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-09-01 20:30:50 +0000
committerjohnc <johnc@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-09-01 20:30:50 +0000
commitaef1929af22061e18b9a232cb0bc7a4da490be69 (patch)
tree023ee3f72f28a3de6b34c2ea658cb884f71852fd
parent206509905f1f66a618eef734f5ac4060d68a8241 (diff)
downloadATCD-aef1929af22061e18b9a232cb0bc7a4da490be69.tar.gz
ChangeLogTag: Mon Sep 1 20:18:51 UTC 2008 Ciju John <johnc at ociweb dot com>
-rw-r--r--TAO/ChangeLog12
-rw-r--r--TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp8
-rw-r--r--TAO/tao/LocateRequest_Invocation.cpp17
-rw-r--r--TAO/tao/Messaging/Asynch_Invocation.cpp5
-rw-r--r--TAO/tao/Synch_Invocation.cpp75
-rw-r--r--TAO/tao/Transport.cpp13
-rw-r--r--TAO/tao/Transport.h7
7 files changed, 97 insertions, 40 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 7f0ef83103a..978d90aa3e0 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,15 @@
+Mon Sep 1 20:18:51 UTC 2008 Ciju John <johnc at ociweb dot com>
+
+ * tao/DynamicInterface/DII_Invocation_Adapter.cpp:
+ * tao/LocateRequest_Invocation.cpp:
+ * tao/Messaging/Asynch_Invocation.cpp:
+ * tao/Synch_Invocation.cpp:
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+ A transport in connecting state can potentially be referenced by
+ multiple concurrent oneway invocations. Transport access needs
+ to be synchronized during invocation.
+
Fri Aug 29 19:12:49 UTC 2008 Adam Mitz <mitza@ociweb.com>
* tao/Transport_Connector.cpp (wait_for_connection_completion):
diff --git a/TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp b/TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp
index 86757520254..af0170fa6d4 100644
--- a/TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp
+++ b/TAO/tao/DynamicInterface/DII_Invocation_Adapter.cpp
@@ -124,6 +124,9 @@ namespace TAO
throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
}
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock ()
+ , TAO_INVOKE_FAILURE);
+
transport->messaging_object ()->out_stream ().reset_byte_order (
request_->_tao_byte_order ());
@@ -133,11 +136,12 @@ namespace TAO
this->exception_list_,
this->request_);
+ ace_mon.release();
Invocation_Status status = synch.remote_invocation (max_wait_time);
- if (status == TAO_INVOKE_RESTART &&
- (synch.reply_status () == GIOP::LOCATION_FORWARD ||
+ if (status == TAO_INVOKE_RESTART &&
+ (synch.reply_status () == GIOP::LOCATION_FORWARD ||
synch.reply_status () == GIOP::LOCATION_FORWARD_PERM))
{
CORBA::Boolean const permanent_forward =
diff --git a/TAO/tao/LocateRequest_Invocation.cpp b/TAO/tao/LocateRequest_Invocation.cpp
index f489e8e52f1..d87c0e10741 100644
--- a/TAO/tao/LocateRequest_Invocation.cpp
+++ b/TAO/tao/LocateRequest_Invocation.cpp
@@ -59,15 +59,20 @@ namespace TAO
TAO_Transport *transport = this->resolver_.transport ();
- TAO_OutputCDR &cdr = transport->out_stream ();
+ Invocation_Status s = TAO_INVOKE_FAILURE;
+ {
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock ()
+ , TAO_INVOKE_FAILURE);
+ TAO_OutputCDR &cdr = transport->out_stream ();
- if (transport->generate_locate_request (tspec, this->details_, cdr) == -1)
- return TAO_INVOKE_FAILURE;
+ if (transport->generate_locate_request (tspec, this->details_, cdr) == -1)
+ return TAO_INVOKE_FAILURE;
- countdown.update ();
+ countdown.update ();
- Invocation_Status s =
- this->send_message (cdr, TAO_Transport::TAO_TWOWAY_REQUEST, max_wait_time);
+ s = this->send_message (cdr, TAO_Transport::TAO_TWOWAY_REQUEST
+ , max_wait_time);
+ }
if (s != TAO_INVOKE_SUCCESS)
return s;
diff --git a/TAO/tao/Messaging/Asynch_Invocation.cpp b/TAO/tao/Messaging/Asynch_Invocation.cpp
index cc9df631e1c..6799c351a0d 100644
--- a/TAO/tao/Messaging/Asynch_Invocation.cpp
+++ b/TAO/tao/Messaging/Asynch_Invocation.cpp
@@ -65,6 +65,9 @@ namespace TAO
throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
}
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock ()
+ , TAO_INVOKE_FAILURE);
+
TAO_OutputCDR & cdr =
this->resolver_.transport ()->messaging_object ()->out_stream ();
@@ -111,6 +114,8 @@ namespace TAO
TAO_Transport::TAO_ONEWAY_REQUEST,
max_wait_time);
+ ace_mon.release();
+
#if TAO_HAS_INTERCEPTORS == 1
// NOTE: We don't need to do the auto_ptr <> trick. We got here
// in the first place since the message was sent properly,
diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp
index b94a200205f..bf6bdd53f41 100644
--- a/TAO/tao/Synch_Invocation.cpp
+++ b/TAO/tao/Synch_Invocation.cpp
@@ -83,6 +83,9 @@ namespace TAO
throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
}
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock ()
+ , TAO_INVOKE_FAILURE);
+
TAO_OutputCDR &cdr = transport->out_stream ();
cdr.message_attributes (this->details_.request_id (),
@@ -95,7 +98,8 @@ namespace TAO
this->marshal_data (cdr);
- // Register a reply dispatcher for this invocation. Use the
+
+ // Register a reply dispatcher for this invocation. Use the
// preallocated reply dispatcher.
TAO_Bind_Dispatcher_Guard dispatch_guard (
this->details_.request_id (),
@@ -117,6 +121,8 @@ namespace TAO
TAO_Transport::TAO_TWOWAY_REQUEST,
max_wait_time);
+ ace_mon.release();
+
#if TAO_HAS_INTERCEPTORS == 1
// @@NOTE: Too much code repetition.
// If the above call returns a restart due to connection
@@ -652,37 +658,42 @@ namespace TAO
throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
}
- TAO_OutputCDR &cdr = transport->out_stream ();
-
- cdr.message_attributes (this->details_.request_id (),
- this->resolver_.stub (),
- TAO_Transport::TAO_ONEWAY_REQUEST,
- max_wait_time,
- false);
-
- this->write_header (cdr);
-
- this->marshal_data (cdr);
-
- countdown.update ();
-
- if (transport->is_connected ())
- {
- // We have a connected transport so we can send the message
- s = this->send_message (cdr,
- TAO_Transport::TAO_ONEWAY_REQUEST,
- max_wait_time);
- }
- else
- {
- if (TAO_debug_level > 4)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Oneway_Invocation::"
- "remote_oneway, queueing message\n"));
-
- if (transport->format_queue_message (cdr, max_wait_time) != 0)
- s = TAO_INVOKE_FAILURE;
- }
+ {
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *transport->output_cdr_lock ()
+ , TAO_INVOKE_FAILURE);
+
+ TAO_OutputCDR &cdr = transport->out_stream ();
+
+ cdr.message_attributes (this->details_.request_id (),
+ this->resolver_.stub (),
+ TAO_Transport::TAO_ONEWAY_REQUEST,
+ max_wait_time,
+ false);
+
+ this->write_header (cdr);
+
+ this->marshal_data (cdr);
+
+ countdown.update ();
+
+ if (transport->is_connected ())
+ {
+ // We have a connected transport so we can send the message
+ s = this->send_message (cdr,
+ TAO_Transport::TAO_ONEWAY_REQUEST,
+ max_wait_time);
+ }
+ else
+ {
+ if (TAO_debug_level > 4)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Synch_Oneway_Invocation::"
+ "remote_oneway, queueing message\n"));
+
+ if (transport->format_queue_message (cdr, max_wait_time) != 0)
+ s = TAO_INVOKE_FAILURE;
+ }
+ }
#if TAO_HAS_INTERCEPTORS == 1
s = this->receive_other_interception ();
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 5256ebea64e..5147b49229f 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -167,7 +167,11 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, stats_ (0)
#endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
, flush_in_post_open_ (0)
+ , output_cdr_lock_ (0)
{
+ ACE_NEW (this->output_cdr_lock_,
+ ACE_Lock_Adapter <TAO_SYNCH_MUTEX> (this->output_cdr_mutex_));
+
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Base (orb_core,
this,
@@ -531,6 +535,9 @@ TAO_Transport::handle_output (ACE_Time_Value *max_wait_time)
this->id ()));
}
+ // drain_queue ultimates touches the OutputCDR and hance needs to be synchronized.
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->output_cdr_lock_, -1);
+
// The flushing strategy (potentially via the Reactor) wants to send
// more data, first check if there is a current message that needs
// more sending...
@@ -2634,6 +2641,12 @@ TAO_Transport::out_stream (void)
return this->messaging_object ()->out_stream ();
}
+ACE_Lock*
+TAO_Transport::output_cdr_lock (void)
+{
+ return this->output_cdr_lock_;
+}
+
void
TAO_Transport::messaging_init (TAO_GIOP_Message_Version const &version)
{
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 120f1b60019..8f09ecf5c00 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -553,6 +553,9 @@ public:
/// Accessor for the output CDR stream
TAO_OutputCDR &out_stream (void);
+ /// Accessor for synchronizing Transport OutputCDR access
+ ACE_Lock* output_cdr_lock (void);
+
/// Set the flush in post open flag
void set_flush_in_post_open (void);
@@ -1086,6 +1089,10 @@ private:
/// Indicate that flushing needs to be done in post_open()
bool flush_in_post_open_;
+ /// lock for synchronizing Transport OutputCDR access
+ mutable ACE_Lock *output_cdr_lock_;
+ mutable TAO_SYNCH_MUTEX output_cdr_mutex_;
+
/*
* specialization hook to add class members from concrete
* transport class onto the base transport class. Please