summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-11 17:23:01 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-11 17:23:01 +0000
commita55b89c8797fffd6f7ce8159079372837645e3d8 (patch)
tree868c23822ee919ce3c2d5f2e1f5341cf28136b61
parent7ae0b01904341fd5c1421123b9397927718384a0 (diff)
downloadATCD-a55b89c8797fffd6f7ce8159079372837645e3d8.tar.gz
ChangeLogTag:Wed Apr 11 10:21:35 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a22
-rw-r--r--TAO/tao/Block_Flushing_Strategy.cpp12
-rw-r--r--TAO/tao/Block_Flushing_Strategy.h1
-rw-r--r--TAO/tao/Flushing_Strategy.h3
-rw-r--r--TAO/tao/Reactive_Flushing_Strategy.cpp27
-rw-r--r--TAO/tao/Reactive_Flushing_Strategy.h1
-rw-r--r--TAO/tao/Transport.cpp22
-rw-r--r--TAO/tao/Transport.h57
8 files changed, 94 insertions, 51 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index e4a1315b0f4..a95e7a9e5b0 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,25 @@
+Wed Apr 11 10:21:35 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * With the following changes the semantics of oneways are
+ bug-compatible with the main trunk.
+
+ * tao/Transport.cpp:
+ Add new method to check if the transport queue has any data
+ pending.
+
+ * tao/Flushing_Strategy.h:
+ Add new operation to block until the queue becomes completely
+ empty.
+
+ * tao/Block_Flushing_Strategy.h:
+ * tao/Block_Flushing_Strategy.cpp:
+ * tao/Reactive_Flushing_Strategy.h:
+ * tao/Reactive_Flushing_Strategy.cpp:
+ Implement flush_transport() in each concrete strategy.
+
+ * tao/Transport.h:
+ Remove old code that was commented out anyway.
+
Mon Apr 09 00:41:20 2001 Carlos O'Ryan <coryan@uci.edu>
* tao/Invocation.cpp:
diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp
index 4703a5e6455..c5e680f7290 100644
--- a/TAO/tao/Block_Flushing_Strategy.cpp
+++ b/TAO/tao/Block_Flushing_Strategy.cpp
@@ -31,3 +31,15 @@ TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport,
}
return 0;
}
+
+int
+TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport)
+{
+ while (!transport->queue_is_empty ())
+ {
+ int result = transport->handle_output ();
+ if (result == -1)
+ return -1;
+ }
+ return 0;
+}
diff --git a/TAO/tao/Block_Flushing_Strategy.h b/TAO/tao/Block_Flushing_Strategy.h
index 82af3a0cd3e..f2524ac2e7b 100644
--- a/TAO/tao/Block_Flushing_Strategy.h
+++ b/TAO/tao/Block_Flushing_Strategy.h
@@ -32,6 +32,7 @@ public:
virtual int cancel_output (TAO_Transport *transport);
virtual int flush_message (TAO_Transport *transport,
TAO_Queued_Message *msg);
+ virtual int flush_transport (TAO_Transport *transport);
};
#include "ace/post.h"
diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h
index 11d6cda2983..5d1257a802c 100644
--- a/TAO/tao/Flushing_Strategy.h
+++ b/TAO/tao/Flushing_Strategy.h
@@ -59,6 +59,9 @@ public:
/// flushed too, for example, because there are ahead in the queue.
virtual int flush_message (TAO_Transport *transport,
TAO_Queued_Message *msg) = 0;
+
+ /// Wait until the transport has no messages queued.
+ virtual int flush_transport (TAO_Transport *transport) = 0;
};
#include "ace/post.h"
diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp
index 8bf90a36a1d..ef3d1521924 100644
--- a/TAO/tao/Reactive_Flushing_Strategy.cpp
+++ b/TAO/tao/Reactive_Flushing_Strategy.cpp
@@ -47,3 +47,30 @@ TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport,
return result;
}
+
+int
+TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport)
+{
+ // @@ Should we pass this down? Can we?
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ TAO_ORB_Core *orb_core = transport->orb_core ();
+
+ while (!transport->queue_is_empty ())
+ {
+ int result = orb_core->run (0, 1, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (result == -1)
+ return -1;
+ }
+ }
+ ACE_CATCHANY
+ {
+ return -1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
diff --git a/TAO/tao/Reactive_Flushing_Strategy.h b/TAO/tao/Reactive_Flushing_Strategy.h
index c1a33aaef5b..16371c75253 100644
--- a/TAO/tao/Reactive_Flushing_Strategy.h
+++ b/TAO/tao/Reactive_Flushing_Strategy.h
@@ -32,6 +32,7 @@ public:
virtual int cancel_output (TAO_Transport *transport);
virtual int flush_message (TAO_Transport *transport,
TAO_Queued_Message *msg);
+ virtual int flush_transport (TAO_Transport *transport);
};
#include "ace/post.h"
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 19fc4c2366b..a5ed9ddd8c1 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -319,6 +319,11 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
size_t byte_count = 0;
ssize_t n;
+
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+
+ int start_flushing = 1;
if (try_sending_first)
{
// ... in this case we must try to send the message first ...
@@ -365,6 +370,8 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
}
}
+ (void) flushing_strategy->schedule_output (this);
+
// ... either the message must be queued or we need to queue it
// because it was not completely sent out ...
@@ -385,9 +392,13 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
// ... if the queue is full we need to activate the output on the
// queue ...
- if (this->must_flush_queue_i (stub))
+ if (start_flushing || this->must_flush_queue_i (stub))
{
- this->orb_core ()->flushing_strategy ()->schedule_output (this);
+ typedef ACE_Reverse_Lock<TAO_SYNCH_MUTEX> TAO_REVERSE_SYNCH_MUTEX;
+ TAO_REVERSE_SYNCH_MUTEX reverse (this->queue_mutex_);
+
+ ACE_GUARD_RETURN (TAO_REVERSE_SYNCH_MUTEX, ace_mon, reverse, -1);
+ (void) flushing_strategy->flush_transport (this);
}
// ... in any case, check for timeouts and report them to the
@@ -665,6 +676,13 @@ TAO_Transport::register_for_timer_event (const void* arg,
}
int
+TAO_Transport::queue_is_empty (void)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, 0);
+ return (this->head_ == 0);
+}
+
+int
TAO_Transport::register_handler (void)
{
ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index e132d080ceb..a3141b0e9ca 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -193,54 +193,6 @@ public:
*/
virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void);
-#if 0
- /// Send a request or queue it for later.
- /**
- * If the right policies are set queue the request for later.
- * Otherwise, or if the queue size has reached the configured
- * limits, start draining the queue.
- *
- * If any data is to be sent it blocks until the queue is completely
- * drained.
- *
- * This method serializes on handler_lock_, guaranteeing that only
- * thread can execute it on the same instance concurrently.
- *
- * @todo: this routine will probably go away as part of the
- * reorganization to support non-blocking writes.
- */
- // @@ lockme
- ssize_t send_or_buffer (TAO_Stub *stub,
- int two_way,
- const ACE_Message_Block *mblk,
- const ACE_Time_Value *s = 0);
-
- /**
- * Return the TSS leader follower condition variable used in the
- * Wait Strategy. Muxed Leader Follower implementation returns a
- * valid condition variable, others return 0.
- */
- virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void);
-
- /// Queue for buffering transport messages.
- virtual TAO_Transport_Buffering_Queue &buffering_queue (void);
-
- /// Timer id associated with buffering.
- long buffering_timer_id (void) const;
- void buffering_timer_id (long);
-
- /// Timeout value associated with buffering.
- const ACE_Time_Value &buffering_timeout_value (void) const;
- void buffering_timeout_value (const ACE_Time_Value &time);
-
- /// Send any messages that have been buffered.
- // @@ lockme
- ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0);
-
- /// Send any messages that have been buffered.
- ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0);
-#endif /* 0 */
-
/**
* Initialising the messaging object. This would be used by the
* connector side. On the acceptor side the connection handler
@@ -257,7 +209,8 @@ public:
/**
* Called by the cache when the cache is closing in order to fill
* in a handle_set in a lock-safe manner.
- * @param handle_set the ACE_Handle_Set into which the transport should place any handle registered with the reactor
+ * @param handle_set the ACE_Handle_Set into which the transport
+ * should place any handle registered with the reactor
*/
void provide_handle (ACE_Handle_Set &handle_set);
@@ -273,6 +226,12 @@ public:
*/
void dequeue_all (void);
+ /// Check if there are messages pending in the queue
+ /**
+ * @return 1 if the queue is empty
+ */
+ int queue_is_empty (void);
+
/// Register the handler with the reactor.
/**
* This method is used by the Wait_On_Reactor strategy. The