summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp417
1 files changed, 78 insertions, 339 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index e07169bb109..6cfccf24bab 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -20,7 +20,6 @@
#include "Thread_Lane_Resources.h"
#include "debug.h"
#include "Resume_Handle.h"
-#include "Notify_Handler.h"
#include "ace/Message_Block.h"
#include "ace/Reactor.h"
@@ -96,16 +95,9 @@ dump_iov (iovec *iov, int iovcnt, size_t id,
ACE_Log_Msg::instance ()->release ();
}
-/*
- * Definitions for methods declared in the transport class
- *
- */
-
-// Constructor.
TAO_Transport::TAO_Transport (CORBA::ULong tag,
TAO_ORB_Core *orb_core)
- : TAO_Synch_Refcountable (orb_core->resource_factory ()->create_cached_connection_lock (), 1)
- , tag_ (tag)
+ : tag_ (tag)
, orb_core_ (orb_core)
, cache_map_entry_ (0)
, bidirectional_flag_ (-1)
@@ -135,8 +127,6 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
TAO_Transport::~TAO_Transport (void)
{
- ACE_ASSERT(this->refcount() == 0);
-
delete this->ws_;
delete this->tms_;
@@ -145,122 +135,17 @@ TAO_Transport::~TAO_Transport (void)
// By the time the destructor is reached all the connection stuff
// *must* have been cleaned up
- ACE_ASSERT(this->head_ == 0);
- ACE_ASSERT(this->cache_map_entry_ == 0);
-}
-
-
-/*
- *
- * Public utility methods that are called by other classes.
- *
- */
-/*static*/ TAO_Transport*
-TAO_Transport::_duplicate (TAO_Transport* transport)
-{
- if (transport != 0)
- {
- transport->increment ();
- }
- return transport;
-}
-
-/*static*/ void
-TAO_Transport::release (TAO_Transport* transport)
-{
- if (transport != 0)
- {
- int count = transport->decrement ();
-
- if (count == 0)
- {
- delete transport;
- }
- else if (count < 0)
- {
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport[%d]::release, "
- "reference count is less than zero: %d\n",
- transport->id (), count));
- ACE_OS::abort ();
- }
- }
+ ACE_ASSERT (this->head_ == 0);
+ ACE_ASSERT (this->cache_map_entry_ == 0);
}
-
void
-TAO_Transport::provide_handle (ACE_Handle_Set &reactor_registered,
- TAO_EventHandlerSet &unregistered)
-{
- ACE_MT (ACE_GUARD (ACE_Lock,
- guard,
- *this->handler_lock_));
- ACE_Event_Handler *eh = this->event_handler_i ();
-
- if (eh != 0)
- {
- if (this->ws_->is_registered ())
- {
- reactor_registered.set_bit (eh->get_handle ());
- }
- else
- {
- unregistered.insert (eh);
- }
- }
-}
-
-
-int
-TAO_Transport::register_handler (void)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->handler_lock_,
- -1));
- if (this->check_event_handler_i ("Transport::register_handler") == -1)
- return -1;
-
- return this->register_handler_i ();
-}
-
-
-ssize_t
-TAO_Transport::send (iovec *iov, int iovcnt,
- size_t &bytes_transferred,
- const ACE_Time_Value *timeout)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->handler_lock_,
- -1));
-
- if (this->check_event_handler_i ("Transport::send") == -1)
- return -1;
-
- // now call the template method
- return this->send_i (iov, iovcnt, bytes_transferred, timeout);
-}
-
-
-ssize_t
-TAO_Transport::recv (char *buffer,
- size_t len,
- const ACE_Time_Value *timeout)
+TAO_Transport::provide_handler (TAO_Connection_Handler_Set &handlers)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->handler_lock_,
- -1));
-
- if (this->check_event_handler_i ("Transport::recv") == -1)
- return -1;
-
- // now call the template method
- return this->recv_i (buffer, len, timeout);
+ this->add_reference ();
+ handlers.insert (this->connection_handler_i ());
}
-
int
TAO_Transport::idle_after_send (void)
{
@@ -282,8 +167,32 @@ TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
void
TAO_Transport::close_connection (void)
{
- TAO_Connection_Handler * eh = this->invalidate_event_handler ();
- this->close_connection_shared (1, eh);
+ this->connection_handler_i ()->close_connection ();
+}
+
+int
+TAO_Transport::register_handler (void)
+{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::register_handler\n",
+ this->id ()));
+ }
+
+ ACE_Reactor *r = this->orb_core_->reactor ();
+
+ if (r == this->event_handler_i ()->reactor ())
+ return 0;
+
+ // Set the flag in the Connection Handler and in the Wait Strategy
+ // @@Maybe we should set these flags after registering with the
+ // reactor. What if the registration fails???
+ this->ws_->is_registered (1);
+
+ // Register the handler with the reactor
+ return r->register_handler (this->event_handler_i (),
+ ACE_Event_Handler::READ_MASK);
}
int
@@ -308,7 +217,6 @@ TAO_Transport::generate_locate_request (
return 0;
}
-
int
TAO_Transport::generate_request_header (
TAO_Operation_Details &opdetails,
@@ -318,8 +226,8 @@ TAO_Transport::generate_request_header (
// codeset service context is only supposed to be sent in the first request
// on a particular connection.
if (this->first_request_)
- this->orb_core()->codeset_manager()->
- generate_service_context( opdetails, *this );
+ this->orb_core ()->codeset_manager ()->
+ generate_service_context (opdetails, *this);
if (this->messaging_object ()->generate_request_header (opdetails,
spec,
@@ -337,32 +245,6 @@ TAO_Transport::generate_request_header (
return 0;
}
-
-/*
- * NOTE: Some of these calls looks like ideal fodder for
- * inlining. But, please note that the calls made within the method
- * are not inlined. This would increase closure cost on the
- * compiler.
- */
-void
-TAO_Transport::connection_handler_closing (void)
-{
- // The connection has closed, we must invalidate the handler to
- // ensure that any attempt to use this transport results in an
- // error. Basically all the other methods in the Transport
- // cooperate via check_event_handler_i()
-
- TAO_Connection_Handler * eh = this->invalidate_event_handler ();
- this->send_connection_closed_notifications ();
-
- if (eh != 0)
- {
- // REFCNT: Matches incr_refcnt in XXX_Transport::XXX_Transport
- // REFCNT: Only one of this or close_connection_shared() run
- eh->decr_refcount();
- }
-}
-
// @@TODO: Ideally the following should be inline.
int
TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
@@ -378,7 +260,7 @@ TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
void
TAO_Transport::purge_entry (void)
{
- (void) this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
+ this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
}
int
@@ -393,7 +275,6 @@ TAO_Transport::update_transport (void)
return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
}
-
/*
*
* Methods called and used in the output path of the ORB.
@@ -427,7 +308,6 @@ TAO_Transport::handle_output (void)
return retval;
}
-
int
TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
size_t &bytes_transferred,
@@ -435,9 +315,6 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
- if (this->check_event_handler_i ("Transport::send_message_block_chain") == -1)
- return -1;
-
return this->send_message_block_chain_i (mb,
bytes_transferred,
max_wait_time);
@@ -490,22 +367,22 @@ TAO_Transport::send_message_shared (TAO_Stub *stub,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
- int r;
+ int result;
+
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
- if (this->check_event_handler_i ("Transport::send_message_shared") == -1)
- return -1;
-
- r = this->send_message_shared_i (stub, message_semantics,
- message_block, max_wait_time);
- }
- if (r == -1)
- {
- this->close_connection ();
+ result =
+ this->send_message_shared_i (stub, message_semantics,
+ message_block, max_wait_time);
}
- return r;
+ if (result == -1)
+ {
+ this->close_connection ();
+ }
+
+ return result;
}
int
@@ -686,74 +563,6 @@ TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_mess
return 0;
}
-
-void
-TAO_Transport::close_connection_i (void)
-{
- TAO_Connection_Handler * eh = this->invalidate_event_handler_i ();
- this->close_connection_shared (1, eh);
-}
-
-void
-TAO_Transport::close_connection_no_purge (void)
-{
- TAO_Connection_Handler * eh = this->invalidate_event_handler ();
-
- this->close_connection_shared (0,
- eh);
-}
-
-void
-TAO_Transport::close_connection_shared (int purge,
- TAO_Connection_Handler * eh)
-{
- // Purge the entry
- if (purge)
- {
- this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
- }
-
- if (eh == 0)
- {
- // The connection was already closed
- return;
- }
-
- // Set the event handler in the connection close wait state.
- (void) eh->connection_close_wait ();
-
- // NOTE: If the wait strategy is in blocking mode, then there is no
- // chance that it could be inside the reactor. We can safely skip
- // driving the LF. If <purge> is 0, then we are cleaned up by the
- // cache. So no point in driving the LF either.
- if (this->ws_->non_blocking () && purge)
- {
- // NOTE: This is a work around for BUG 1020. We drive the leader
- // follower for a predetermined amount of time. Ideally this
- // needs to be an ORB option. But this is just the first
- // cut. Doing that will be a todo..
-
- ACE_Time_Value tv (ACE_DEFAULT_TIMEOUT, 0);
- this->orb_core_->leader_follower ().wait_for_event (eh,
- this,
- &tv);
-
- }
-
- // We need to explicitly shut it down to avoid memory leaks.
- if (!eh->successful () ||
- !this->ws_->non_blocking ())
- {
- eh->close_connection ();
- }
-
- this->send_connection_closed_notifications ();
-
- // REFCNT: Matches incr_refcnt in XXX_Transport::XXX_Transport
- // REFCNT: Only one of this or connection_handler_closing() run
- eh->decr_refcount ();
-}
-
int
TAO_Transport::queue_is_empty_i (void)
{
@@ -765,12 +574,7 @@ int
TAO_Transport::schedule_output_i (void)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
-
ACE_Reactor *reactor = eh->reactor ();
- if (reactor == 0)
- return -1;
if (TAO_debug_level > 3)
{
@@ -786,12 +590,7 @@ int
TAO_Transport::cancel_output_i (void)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
-
ACE_Reactor *reactor = eh->reactor ();
- if (reactor == 0)
- return -1;
if (TAO_debug_level > 3)
{
@@ -857,14 +656,11 @@ TAO_Transport::drain_queue (void)
int
TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
{
- if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1)
- return -1;
-
size_t byte_count = 0;
// ... send the message ...
ssize_t retval =
- this->send_i (iov, iovcnt, byte_count);
+ this->send (iov, iovcnt, byte_count);
if (TAO_debug_level == 5)
{
@@ -896,7 +692,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
"error during %p\n",
- this->id (), "send_i()"));
+ this->id (), "send()"));
}
if (errno == EWOULDBLOCK)
return 0;
@@ -981,14 +777,8 @@ TAO_Transport::drain_queue_i (void)
if (this->flush_timer_pending ())
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh != 0)
- {
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor != 0)
- {
- (void) reactor->cancel_timer (this->flush_timer_id_);
- }
- }
+ ACE_Reactor *reactor = eh->reactor ();
+ reactor->cancel_timer (this->flush_timer_id_);
this->reset_flush_timer ();
}
return 1;
@@ -1063,25 +853,19 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
if (set_timer)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh != 0)
- {
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor != 0)
- {
- this->current_deadline_ = new_deadline;
- ACE_Time_Value delay =
- new_deadline - ACE_OS::gettimeofday ();
+ ACE_Reactor *reactor = eh->reactor ();
+ this->current_deadline_ = new_deadline;
+ ACE_Time_Value delay =
+ new_deadline - ACE_OS::gettimeofday ();
- if (this->flush_timer_pending ())
- {
- (void) reactor->cancel_timer (this->flush_timer_id_);
- }
- this->flush_timer_id_ =
- reactor->schedule_timer (&this->transport_timer_,
- &this->current_deadline_,
- delay);
- }
+ if (this->flush_timer_pending ())
+ {
+ reactor->cancel_timer (this->flush_timer_id_);
}
+ this->flush_timer_id_ =
+ reactor->schedule_timer (&this->transport_timer_,
+ &this->current_deadline_,
+ delay);
}
return constraints_reached;
@@ -1099,14 +883,6 @@ TAO_Transport::report_invalid_event_handler (const char *caller)
}
}
-TAO_Connection_Handler *
-TAO_Transport::invalidate_event_handler (void)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0));
-
- return this->invalidate_event_handler_i ();
-}
-
void
TAO_Transport::send_connection_closed_notifications (void)
{
@@ -1152,7 +928,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
max_wait_time);
}
-
// Let's figure out if the message should be queued without trying
// to send first:
int try_sending_first = 1;
@@ -1287,8 +1062,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
return 0;
}
-
-
/*
*
* All the methods relevant to the incoming data path of the ORB are
@@ -1296,9 +1069,9 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
*
*/
int
-TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
- ACE_Time_Value * max_wait_time,
- int /*block*/)
+TAO_Transport::handle_input (TAO_Resume_Handle &rh,
+ ACE_Time_Value * max_wait_time,
+ int /*block*/)
{
if (TAO_debug_level > 3)
{
@@ -1319,9 +1092,8 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
"TAO (%P|%t) - Transport[%d]::handle_input_i, "
"error while parsing the head of the queue\n",
this->id()));
-
- this->send_connection_closed_notifications ();
}
+
return retval;
}
@@ -1376,14 +1148,7 @@ TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
// If there is an error return to the reactor..
if (n <= 0)
- {
- if (n == -1)
- {
- this->send_connection_closed_notifications ();
- }
-
- return n;
- }
+ return n;
if (TAO_debug_level > 2)
{
@@ -1492,7 +1257,6 @@ TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
"error in incoming message\n",
this->id ()));
- this->send_connection_closed_notifications ();
return -1;
}
}
@@ -1584,7 +1348,7 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
"error while trying to consolidate\n",
this->id ()));
}
- this->send_connection_closed_notifications ();
+
return -1;
}
@@ -1936,8 +1700,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
"received CloseConnection message %p\n",
this->id(), ""));
- this->send_connection_closed_notifications ();
-
// Return a "-1" so that the next stage can take care of
// closing connection and the necessary memory management.
return -1;
@@ -1945,17 +1707,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST ||
t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST)
{
- // Ready to process a request. Increment the refcount of <this
- // transport>. Theoretically, after handler resumption another
- // thread can access this very same transport and can even close
- // this. To have a valid Transport object for further processing
- // we should increment the refcount. Please see Bug 1382 for
- // more details.
- // REFCNT: Matched by the release before returning.
-
- // This generic class takes care of everything.
- TAO_Transport_Refcount_Guard rg (this);
-
// Let us resume the handle before we go ahead to process the
// request. This will open up the handle for other threads.
rh.resume_handle ();
@@ -1964,9 +1715,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
this,
qd) == -1)
{
- this->send_connection_closed_notifications ();
-
-
// Return a "-1" so that the next stage can take care of
// closing connection and the necessary memory management.
return -1;
@@ -1975,8 +1723,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
else if (t == TAO_PLUGGABLE_MESSAGE_REPLY ||
t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
{
- // Please see ..else if (XXX_REQUEST) for whys and whats..
- TAO_Transport_Refcount_Guard rg (this);
rh.resume_handle ();
// @@todo: Maybe the input_cdr can be constructed from the
@@ -1993,7 +1739,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
"error in process_reply_message %p\n",
this->id (), ""));
- this->send_connection_closed_notifications ();
return -1;
}
@@ -2009,7 +1754,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
"dispatch reply failed\n",
this->id ()));
- this->send_connection_closed_notifications ();
return -1;
}
@@ -2160,25 +1904,10 @@ TAO_Transport::notify_reactor (void)
ACE_Event_Handler *eh =
this->event_handler_i ();
- if (eh == 0)
- return -1;
-
// Get the reactor associated with the event handler
ACE_Reactor *reactor =
this->orb_core ()->reactor ();
- if (reactor == 0)
- return -1;
-
- // NOTE: Instead of creating the handler seperately, it would be
- // awesome if we could create the handler when we create the
- // TAO_Queued_Data. That would save us an allocation.
- TAO_Notify_Handler *nh =
- TAO_Notify_Handler::create_handler (
- this,
- eh->get_handle (),
- this->orb_core ()->transport_message_buffer_allocator ());
-
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
@@ -2189,7 +1918,7 @@ TAO_Transport::notify_reactor (void)
// Send a notification to the reactor...
- int retval = reactor->notify (nh,
+ int retval = reactor->notify (eh,
ACE_Event_Handler::READ_MASK);
if (retval < 0 && TAO_debug_level > 2)
@@ -2226,7 +1955,17 @@ TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
}
}
+ACE_Event_Handler::Reference_Count
+TAO_Transport::add_reference (void)
+{
+ return this->event_handler_i ()->add_reference ();
+}
+ACE_Event_Handler::Reference_Count
+TAO_Transport::remove_reference (void)
+{
+ return this->event_handler_i ()->remove_reference ();
+}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)