summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-07-12 18:58:13 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-07-12 18:58:13 +0000
commitc0a0f2bd0a456f2734b187bd26f29812464908f2 (patch)
tree8480b60bc3b74d991c5f569786240ce555967441
parent0ef45edb9c0aea062511d046d20f7d12a02c0d5f (diff)
downloadATCD-c0a0f2bd0a456f2734b187bd26f29812464908f2.tar.gz
ChangeLogTag:Fri Jul 12 14:10:14 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/tao/Asynch_Queued_Message.cpp6
-rw-r--r--TAO/tao/Asynch_Queued_Message.h3
-rw-r--r--TAO/tao/ChangeLog15
-rw-r--r--TAO/tao/IIOP_Transport.cpp4
-rw-r--r--TAO/tao/Invocation.cpp11
-rw-r--r--TAO/tao/Invocation.h4
-rw-r--r--TAO/tao/Queued_Message.h3
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp4
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp4
-rw-r--r--TAO/tao/Strategies/UIOP_Transport.cpp4
-rw-r--r--TAO/tao/Synch_Queued_Message.cpp6
-rw-r--r--TAO/tao/Synch_Queued_Message.h7
-rw-r--r--TAO/tao/Transport.cpp109
-rw-r--r--TAO/tao/Transport.h71
14 files changed, 169 insertions, 82 deletions
diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp
index 736a31208fd..3b87bbaf445 100644
--- a/TAO/tao/Asynch_Queued_Message.cpp
+++ b/TAO/tao/Asynch_Queued_Message.cpp
@@ -9,8 +9,10 @@ ACE_RCSID (tao,
TAO_Asynch_Queued_Message::
- TAO_Asynch_Queued_Message (const ACE_Message_Block *contents)
- : offset_ (0)
+ TAO_Asynch_Queued_Message (const ACE_Message_Block *contents,
+ ACE_Allocator *alloc)
+ : TAO_Queued_Message (alloc)
+ , offset_ (0)
{
this->size_ = contents->total_length ();
// @@ Use a pool for these guys!!
diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h
index dbb4ca38bb0..a2708691abd 100644
--- a/TAO/tao/Asynch_Queued_Message.h
+++ b/TAO/tao/Asynch_Queued_Message.h
@@ -39,7 +39,8 @@ public:
* need to hear when the connection timeouts or closes, but
* cannot block waiting for the message to be delivered.
*/
- TAO_Asynch_Queued_Message (const ACE_Message_Block *contents);
+ TAO_Asynch_Queued_Message (const ACE_Message_Block *contents,
+ ACE_Allocator *alloc = 0);
/// Destructor
virtual ~TAO_Asynch_Queued_Message (void);
diff --git a/TAO/tao/ChangeLog b/TAO/tao/ChangeLog
new file mode 100644
index 00000000000..248048a0ea9
--- /dev/null
+++ b/TAO/tao/ChangeLog
@@ -0,0 +1,15 @@
+Fri Jul 12 14:10:14 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * M Asynch_Queued_Message.cpp
+M Asynch_Queued_Message.h
+M IIOP_Transport.cpp
+M Invocation.cpp
+M Invocation.h
+M Queued_Message.h
+M Synch_Queued_Message.cpp
+M Synch_Queued_Message.h
+M Transport.cpp
+M Transport.h
+M Strategies/DIOP_Transport.cpp
+M Strategies/SHMIOP_Transport.cpp
+M Strategies/UIOP_Transport.cpp
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 9ba0eb66310..e63bea76275 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -177,7 +177,7 @@ TAO_IIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
- int twoway,
+ int write_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
@@ -186,7 +186,7 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
- twoway,
+ write_semantics,
stream.begin (),
max_wait_time);
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index a41f8fd538e..2459e750f00 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -370,7 +370,7 @@ TAO_GIOP_Invocation::prepare_header (CORBA::Octet response_flags
// Send request.
int
-TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous
+TAO_GIOP_Invocation::invoke (CORBA::Boolean write_semantics
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
@@ -386,7 +386,7 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_synchronous
this->transport_->send_request (this->stub_,
this->orb_core_,
this->out_stream_,
- is_synchronous,
+ write_semantics,
this->max_wait_time_);
//
@@ -589,7 +589,8 @@ TAO_GIOP_Synch_Invocation::invoke_i (CORBA::Boolean is_locate_request
}
// Just send the request, without trying to wait for the reply.
- int retval = TAO_GIOP_Invocation::invoke (1 ACE_ENV_ARG_PARAMETER);
+ int retval = TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_TWOWAY_REQUEST
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (retval);
if (retval != TAO_INVOKE_OK)
@@ -962,12 +963,12 @@ TAO_GIOP_Oneway_Invocation::invoke (ACE_ENV_SINGLE_ARG_DECL)
|| this->sync_scope_ == TAO::SYNC_EAGER_BUFFERING
|| this->sync_scope_ == TAO::SYNC_DELAYED_BUFFERING)
{
- return TAO_GIOP_Invocation::invoke (0
+ return TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_ONEWAY_REQUEST
ACE_ENV_ARG_PARAMETER);
}
if (this->sync_scope_ == Messaging::SYNC_WITH_TRANSPORT)
{
- return TAO_GIOP_Invocation::invoke (1
+ return TAO_GIOP_Invocation::invoke (TAO_Transport::TAO_TWOWAY_REQUEST
ACE_ENV_ARG_PARAMETER);
}
diff --git a/TAO/tao/Invocation.h b/TAO/tao/Invocation.h
index ef2ef23d30d..8a1f8658409 100644
--- a/TAO/tao/Invocation.h
+++ b/TAO/tao/Invocation.h
@@ -206,7 +206,7 @@ protected:
* Returns TAO_INVOKE_RESTART if the write call failed and the
* request must be re-attempted.
*
- * @param is_synchronous If set invoke() does not return until the
+ * @param write_semantics If set invoke() does not return until the
* message is completely delivered to the underlying
* transport mechanism, or an error is detected.
*
@@ -214,7 +214,7 @@ protected:
* that the server closed the connection simply to release
* resources.
*/
- int invoke (CORBA::Boolean is_synchronous
+ int invoke (CORBA::Boolean write_semantics
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException));
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index 591aeb01b11..a49bac11b5c 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -15,12 +15,13 @@
#include "ace/pre.h"
#include "corbafwd.h"
-#include "LF_Event.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "LF_Event.h"
+
class ACE_Message_Block;
/**
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
index 0b9e65da5de..f8b68f6f5dc 100644
--- a/TAO/tao/Strategies/DIOP_Transport.cpp
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -267,7 +267,7 @@ TAO_DIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
- int twoway,
+ int write_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
@@ -280,7 +280,7 @@ TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
- twoway,
+ write_semantics,
stream.begin (),
max_wait_time);
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index a3e32520f27..1101ee436ce 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -240,7 +240,7 @@ TAO_SHMIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
- int twoway,
+ int write_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
@@ -253,7 +253,7 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
- twoway,
+ write_semantics,
stream.begin (),
max_wait_time);
diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp
index bdc417892dd..def3963d91f 100644
--- a/TAO/tao/Strategies/UIOP_Transport.cpp
+++ b/TAO/tao/Strategies/UIOP_Transport.cpp
@@ -170,7 +170,7 @@ TAO_UIOP_Transport::send_request (TAO_Stub *stub,
int
TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream,
TAO_Stub *stub,
- int twoway,
+ int write_semantics,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
@@ -183,7 +183,7 @@ TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream,
// This guarantees to send all data (bytes) or return an error.
ssize_t n = this->send_message_shared (stub,
- twoway,
+ write_semantics,
stream.begin (),
max_wait_time);
diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp
index ce7bd413d8a..75aa7a8c7d5 100644
--- a/TAO/tao/Synch_Queued_Message.cpp
+++ b/TAO/tao/Synch_Queued_Message.cpp
@@ -8,8 +8,10 @@ ACE_RCSID (tao,
TAO_Synch_Queued_Message::
- TAO_Synch_Queued_Message (const ACE_Message_Block *contents)
- : contents_ (ACE_const_cast (ACE_Message_Block*,contents))
+ TAO_Synch_Queued_Message (const ACE_Message_Block *contents,
+ ACE_Allocator *alloc)
+ : TAO_Queued_Message (alloc)
+ , contents_ (ACE_const_cast (ACE_Message_Block*,contents))
, current_block_ (contents_)
{
}
diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h
index 2a8c638abf7..b9f4c315971 100644
--- a/TAO/tao/Synch_Queued_Message.h
+++ b/TAO/tao/Synch_Queued_Message.h
@@ -47,8 +47,13 @@ public:
/// Constructor
/**
* @param contents The message block chain that must be sent.
+ *
+ * @param alloc The allocator that is used to allocate objects of
+ * this type.
*/
- TAO_Synch_Queued_Message (const ACE_Message_Block *contents);
+ TAO_Synch_Queued_Message (const ACE_Message_Block *contents;
+
+
/// Destructor
virtual ~TAO_Synch_Queued_Message (void);
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 335a0e75c9c..eb9e8ea890a 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -479,7 +479,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
int
TAO_Transport::send_message_shared (TAO_Stub *stub,
- int is_synchronous,
+ int write_semantics,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
@@ -490,7 +490,7 @@ TAO_Transport::send_message_shared (TAO_Stub *stub,
if (this->check_event_handler_i ("Transport::send_message_shared") == -1)
return -1;
- r = this->send_message_shared_i (stub, is_synchronous,
+ r = this->send_message_shared_i (stub, write_semantics,
message_block, max_wait_time);
}
if (r == -1)
@@ -509,36 +509,18 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// the message block.
TAO_Synch_Queued_Message synch_message (mb);
- synch_message.push_back (this->head_, this->tail_);
+ int n =
+ this->send_synch_message_helper_i (synch_message,
+ max_wait_time);
- int n = this->drain_queue_i ();
- if (n == -1)
- {
- synch_message.remove_from_list (this->head_, this->tail_);
- ACE_ASSERT (synch_message.next () == 0);
- ACE_ASSERT (synch_message.prev () == 0);
- return -1; // Error while sending...
- }
- else if (n == 1)
- {
- ACE_ASSERT (synch_message.all_data_sent ());
- ACE_ASSERT (synch_message.next () == 0);
- ACE_ASSERT (synch_message.prev () == 0);
- return 1; // Empty queue, message was sent..
- }
+ if (n == -1 ||
+ n == 1)
+ return n;
- ACE_ASSERT (n == 0); // Some data sent, but data remains.
-
- if (synch_message.all_data_sent ())
- {
- ACE_ASSERT (synch_message.next () == 0);
- ACE_ASSERT (synch_message.prev () == 0);
- return 1;
- }
+ ACE_ASSERT (n == 0);
// @todo: Check for timeouts!
// if (max_wait_time != 0 && errno == ETIME) return -1;
-
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
(void) flushing_strategy->schedule_output (this);
@@ -608,6 +590,68 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
}
+int
+TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
+ ACE_Time_Value *max_wait_time)
+{
+ // Dont clone now.. We could be sent in one shot!
+ TAO_Synch_Queued_Message synch_message (mb);
+
+ int n =
+ this->send_synch_message_helper_i (synch_message,
+ max_wait_time);
+
+ if (n == -1 ||
+ n == 1)
+ return n;
+
+ ACE_ASSERT (n == 0);
+
+ // Till this point we shouldnt have any copying and that is the
+ // point anyway.
+ // Now, remove the node from the list
+
+ // Clone the node
+
+
+
+
+}
+
+int
+TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
+ ACE_Time_Value * /*max_wait_time*/)
+{
+ synch_message.push_back (this->head_, this->tail_);
+
+ // @@todo: Need to send timeouts for writing..
+ int n = this->drain_queue_i ();
+ if (n == -1)
+ {
+ synch_message.remove_from_list (this->head_, this->tail_);
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
+ return -1; // Error while sending...
+ }
+ else if (n == 1)
+ {
+ ACE_ASSERT (synch_message.all_data_sent ());
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
+ return 1; // Empty queue, message was sent..
+ }
+
+ ACE_ASSERT (n == 0); // Some data sent, but data remains.
+
+ if (synch_message.all_data_sent ())
+ {
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
+ return 1;
+ }
+
+ return 0;
+}
void
@@ -659,11 +703,6 @@ TAO_Transport::close_connection_shared (int disable_purge,
this->send_connection_closed_notifications ();
}
-
-
-
-
-
int
TAO_Transport::queue_is_empty_i (void)
{
@@ -671,8 +710,6 @@ TAO_Transport::queue_is_empty_i (void)
}
-
-
int
TAO_Transport::schedule_output_i (void)
{
@@ -1041,11 +1078,11 @@ TAO_Transport::send_connection_closed_notifications (void)
int
TAO_Transport::send_message_shared_i (TAO_Stub *stub,
- int is_synchronous,
+ int write_semantics,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
- if (is_synchronous)
+ if (write_semantics == TAO_Transport::TAO_TWOWAY_REQUEST)
{
return this->send_synchronous_message_i (message_block,
max_wait_time);
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 4cfc9320f98..5f08cbe7326 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -39,6 +39,7 @@ class TAO_Connection_Handler;
class TAO_Pluggable_Messaging;
class TAO_Queued_Message;
+class TAO_Synch_Queued_Message;
class TAO_Resume_Handle;
@@ -444,8 +445,6 @@ protected:
* will reduce footprint and simplify the process of implementing a
* pluggable protocol.
*/
- // @@ this is broken once we add the lock b/c it returns the thing
- // we're trying to lock down! (CJC)
virtual ACE_Event_Handler * event_handler_i (void) = 0;
virtual TAO_Connection_Handler * connection_handler_i (void) = 0;
@@ -558,6 +557,15 @@ public:
int block = 0);
+ enum
+ {
+ TAO_ONEWAY_REQUEST = 0,
+ TAO_TWOWAY_REQUEST = 1,
+ TAO_REPLY
+ };
+
+
+
/// Prepare the waiting and demuxing strategy to receive a reply for
/// a new request.
/**
@@ -587,7 +595,7 @@ public:
virtual int send_request (TAO_Stub *stub,
TAO_ORB_Core *orb_core,
TAO_OutputCDR &stream,
- int is_synchronous,
+ int write_semantics,
ACE_Time_Value *max_time_wait) = 0;
@@ -601,12 +609,33 @@ public:
* header can finally be set to the proper value.
*
*/
- // @@ lockme
virtual int send_message (TAO_OutputCDR &stream,
TAO_Stub *stub = 0,
- int is_synchronous = 1,
+ int write_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
ACE_Time_Value *max_time_wait = 0) = 0;
+
+ /// Sent the contents of <message_block>
+ /**
+ * @param stub The object reference used for this operation, useful
+ * to obtain the current policies.
+ * @param write_semantics If this is set to TAO_TWO_REQUEST
+ * this method will block until the operation is completely
+ * written on the wire. If it is set to other values this
+ * operation could return.
+ * @param message_block The CDR encapsulation of the GIOP message
+ * that must be sent. The message may consist of
+ * multiple Message Blocks chained through the cont()
+ * field.
+ * @param max_wait_time The maximum time that the operation can
+ * block, used in the implementation of timeouts.
+ */
+ int send_message_shared (TAO_Stub *stub,
+ int write_semantics,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+
protected:
/// Register the handler with the reactor.
/**
@@ -618,7 +647,6 @@ protected:
* thread-per-connection mode. In that case putting the connection
* in the Reactor would produce unpredictable results anyway.
*/
- // @@ lockme
virtual int register_handler_i (void) = 0;
/// Called by the handle_input_i (). This method is used to parse
@@ -687,23 +715,7 @@ public:
size_t &bytes_transferred,
ACE_Time_Value *max_wait_time = 0);
- /// Sent the contents of <message_block>
- /**
- * @param stub The object reference used for this operation, useful
- * to obtain the current policies.
- * @param is_synchronous If set this method will block until the
- * operation is completely written on the wire
- * @param message_block The CDR encapsulation of the GIOP message
- * that must be sent. The message may consist of
- * multiple Message Blocks chained through the cont()
- * field.
- * @param max_wait_time The maximum time that the operation can
- * block, used in the implementation of timeouts.
- */
- int send_message_shared (TAO_Stub *stub,
- int is_synchronous,
- const ACE_Message_Block *message_block,
- ACE_Time_Value *max_wait_time);
+
/// Send a message block chain, assuming the lock is held
int send_message_block_chain_i (const ACE_Message_Block *message_block,
@@ -802,6 +814,17 @@ private:
int send_synchronous_message_i (const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time);
+ /// Send a reply message, i.e. do not block until the message is on
+ /// the wire, but just return after adding them to the queue.
+ int send_reply_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+ /// A helper method used by <send_synchronous_message_i> and
+ /// <send_reply_message_i>. Reusable code that could be used by both
+ /// the methods.
+ int send_synch_message_helper_i (TAO_Synch_Queued_Message &s,
+ ACE_Time_Value *max_wait_time);
+
/// Check if the flush timer is still pending
int flush_timer_pending (void) const;
@@ -841,7 +864,7 @@ private:
/// Implement send_message_shared() assuming the handler_lock_ is
/// held.
int send_message_shared_i (TAO_Stub *stub,
- int is_synchronous,
+ int write_semantics,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time);