diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-11 17:23:01 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-11 17:23:01 +0000 |
commit | a55b89c8797fffd6f7ce8159079372837645e3d8 (patch) | |
tree | 868c23822ee919ce3c2d5f2e1f5341cf28136b61 | |
parent | 7ae0b01904341fd5c1421123b9397927718384a0 (diff) | |
download | ATCD-a55b89c8797fffd6f7ce8159079372837645e3d8.tar.gz |
ChangeLogTag:Wed Apr 11 10:21:35 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 22 | ||||
-rw-r--r-- | TAO/tao/Block_Flushing_Strategy.cpp | 12 | ||||
-rw-r--r-- | TAO/tao/Block_Flushing_Strategy.h | 1 | ||||
-rw-r--r-- | TAO/tao/Flushing_Strategy.h | 3 | ||||
-rw-r--r-- | TAO/tao/Reactive_Flushing_Strategy.cpp | 27 | ||||
-rw-r--r-- | TAO/tao/Reactive_Flushing_Strategy.h | 1 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 22 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 57 |
8 files changed, 94 insertions, 51 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index e4a1315b0f4..a95e7a9e5b0 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,25 @@ +Wed Apr 11 10:21:35 2001 Carlos O'Ryan <coryan@uci.edu> + + * With the following changes the semantics of oneways are + bug-compatible with the main trunk. + + * tao/Transport.cpp: + Add new method to check if the transport queue has any data + pending. + + * tao/Flushing_Strategy.h: + Add new operation to block until the queue becomes completely + empty. + + * tao/Block_Flushing_Strategy.h: + * tao/Block_Flushing_Strategy.cpp: + * tao/Reactive_Flushing_Strategy.h: + * tao/Reactive_Flushing_Strategy.cpp: + Implement flush_transport() in each concrete strategy. + + * tao/Transport.h: + Remove old code that was commented out anyway. + Mon Apr 09 00:41:20 2001 Carlos O'Ryan <coryan@uci.edu> * tao/Invocation.cpp: diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp index 4703a5e6455..c5e680f7290 100644 --- a/TAO/tao/Block_Flushing_Strategy.cpp +++ b/TAO/tao/Block_Flushing_Strategy.cpp @@ -31,3 +31,15 @@ TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport, } return 0; } + +int +TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport) +{ + while (!transport->queue_is_empty ()) + { + int result = transport->handle_output (); + if (result == -1) + return -1; + } + return 0; +} diff --git a/TAO/tao/Block_Flushing_Strategy.h b/TAO/tao/Block_Flushing_Strategy.h index 82af3a0cd3e..f2524ac2e7b 100644 --- a/TAO/tao/Block_Flushing_Strategy.h +++ b/TAO/tao/Block_Flushing_Strategy.h @@ -32,6 +32,7 @@ public: virtual int cancel_output (TAO_Transport *transport); virtual int flush_message (TAO_Transport *transport, TAO_Queued_Message *msg); + virtual int flush_transport (TAO_Transport *transport); }; #include "ace/post.h" diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h index 11d6cda2983..5d1257a802c 100644 --- a/TAO/tao/Flushing_Strategy.h +++ b/TAO/tao/Flushing_Strategy.h @@ -59,6 +59,9 @@ public: /// flushed too, for example, because there are ahead in the queue. virtual int flush_message (TAO_Transport *transport, TAO_Queued_Message *msg) = 0; + + /// Wait until the transport has no messages queued. + virtual int flush_transport (TAO_Transport *transport) = 0; }; #include "ace/post.h" diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp index 8bf90a36a1d..ef3d1521924 100644 --- a/TAO/tao/Reactive_Flushing_Strategy.cpp +++ b/TAO/tao/Reactive_Flushing_Strategy.cpp @@ -47,3 +47,30 @@ TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport, return result; } + +int +TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport) +{ + // @@ Should we pass this down? Can we? + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + TAO_ORB_Core *orb_core = transport->orb_core (); + + while (!transport->queue_is_empty ()) + { + int result = orb_core->run (0, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (result == -1) + return -1; + } + } + ACE_CATCHANY + { + return -1; + } + ACE_ENDTRY; + + return 0; +} diff --git a/TAO/tao/Reactive_Flushing_Strategy.h b/TAO/tao/Reactive_Flushing_Strategy.h index c1a33aaef5b..16371c75253 100644 --- a/TAO/tao/Reactive_Flushing_Strategy.h +++ b/TAO/tao/Reactive_Flushing_Strategy.h @@ -32,6 +32,7 @@ public: virtual int cancel_output (TAO_Transport *transport); virtual int flush_message (TAO_Transport *transport, TAO_Queued_Message *msg); + virtual int flush_transport (TAO_Transport *transport); }; #include "ace/post.h" diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 19fc4c2366b..a5ed9ddd8c1 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -319,6 +319,11 @@ TAO_Transport::send_message_i (TAO_Stub *stub, size_t byte_count = 0; ssize_t n; + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + int start_flushing = 1; if (try_sending_first) { // ... in this case we must try to send the message first ... @@ -365,6 +370,8 @@ TAO_Transport::send_message_i (TAO_Stub *stub, } } + (void) flushing_strategy->schedule_output (this); + // ... either the message must be queued or we need to queue it // because it was not completely sent out ... @@ -385,9 +392,13 @@ TAO_Transport::send_message_i (TAO_Stub *stub, // ... if the queue is full we need to activate the output on the // queue ... - if (this->must_flush_queue_i (stub)) + if (start_flushing || this->must_flush_queue_i (stub)) { - this->orb_core ()->flushing_strategy ()->schedule_output (this); + typedef ACE_Reverse_Lock<TAO_SYNCH_MUTEX> TAO_REVERSE_SYNCH_MUTEX; + TAO_REVERSE_SYNCH_MUTEX reverse (this->queue_mutex_); + + ACE_GUARD_RETURN (TAO_REVERSE_SYNCH_MUTEX, ace_mon, reverse, -1); + (void) flushing_strategy->flush_transport (this); } // ... in any case, check for timeouts and report them to the @@ -665,6 +676,13 @@ TAO_Transport::register_for_timer_event (const void* arg, } int +TAO_Transport::queue_is_empty (void) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, 0); + return (this->head_ == 0); +} + +int TAO_Transport::register_handler (void) { ACE_MT (ACE_GUARD_RETURN (ACE_Lock, diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index e132d080ceb..a3141b0e9ca 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -193,54 +193,6 @@ public: */ virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void); -#if 0 - /// Send a request or queue it for later. - /** - * If the right policies are set queue the request for later. - * Otherwise, or if the queue size has reached the configured - * limits, start draining the queue. - * - * If any data is to be sent it blocks until the queue is completely - * drained. - * - * This method serializes on handler_lock_, guaranteeing that only - * thread can execute it on the same instance concurrently. - * - * @todo: this routine will probably go away as part of the - * reorganization to support non-blocking writes. - */ - // @@ lockme - ssize_t send_or_buffer (TAO_Stub *stub, - int two_way, - const ACE_Message_Block *mblk, - const ACE_Time_Value *s = 0); - - /** - * Return the TSS leader follower condition variable used in the - * Wait Strategy. Muxed Leader Follower implementation returns a - * valid condition variable, others return 0. - */ - virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void); - - /// Queue for buffering transport messages. - virtual TAO_Transport_Buffering_Queue &buffering_queue (void); - - /// Timer id associated with buffering. - long buffering_timer_id (void) const; - void buffering_timer_id (long); - - /// Timeout value associated with buffering. - const ACE_Time_Value &buffering_timeout_value (void) const; - void buffering_timeout_value (const ACE_Time_Value &time); - - /// Send any messages that have been buffered. - // @@ lockme - ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0); - - /// Send any messages that have been buffered. - ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0); -#endif /* 0 */ - /** * Initialising the messaging object. This would be used by the * connector side. On the acceptor side the connection handler @@ -257,7 +209,8 @@ public: /** * Called by the cache when the cache is closing in order to fill * in a handle_set in a lock-safe manner. - * @param handle_set the ACE_Handle_Set into which the transport should place any handle registered with the reactor + * @param handle_set the ACE_Handle_Set into which the transport + * should place any handle registered with the reactor */ void provide_handle (ACE_Handle_Set &handle_set); @@ -273,6 +226,12 @@ public: */ void dequeue_all (void); + /// Check if there are messages pending in the queue + /** + * @return 1 if the queue is empty + */ + int queue_is_empty (void); + /// Register the handler with the reactor. /** * This method is used by the Wait_On_Reactor strategy. The |