From 3cda7554ccedda0c19f9e4fca0bcf3d61e6dc871 Mon Sep 17 00:00:00 2001 From: sma Date: Thu, 24 Jan 2013 15:52:45 +0000 Subject: Thu Jan 24 15:51:00 UTC 2013 Simon Massey * NEWS: * doc/Options.h: * orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp: * orbsvcs/orbsvcs/PortableGroup/miop_resource.h: * orbsvcs/orbsvcs/PortableGroup/miopconf.h: * orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp: * orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h: * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp: Added MIOP configuration options -ORBSendThrottling and -ORBEagerDequeueing, along with #define overrides for their default settings. See the descriptions in the MIOP section of doc/Options.html for their use. --- .../PortableGroup/UIPMC_Mcast_Transport.cpp | 305 ++++++++++++--------- .../orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h | 6 +- .../orbsvcs/PortableGroup/UIPMC_Transport.cpp | 12 +- .../orbsvcs/PortableGroup/miop_resource.cpp | 14 + TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h | 13 +- TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h | 8 + 6 files changed, 227 insertions(+), 131 deletions(-) (limited to 'TAO/orbsvcs/orbsvcs/PortableGroup') diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp index 6e0b8762556..22459eb5b44 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp @@ -284,115 +284,206 @@ TAO_UIPMC_Mcast_Transport::recv_packet ( return &buf[miop_header_size]; } -bool -TAO_UIPMC_Mcast_Transport::recv_all (void) +TAO_PG::UIPMC_Recv_Packet * +TAO_UIPMC_Mcast_Transport::recv_all (TAO_Resume_Handle &rh) { + const TAO_MIOP_Resource_Factory *const factory = + ACE_Dynamic_Service::instance ( + this->orb_core_->configuration(), + ACE_TEXT ("MIOP_Resource_Factory")); + const bool eager_dequeue= factory->enable_eager_dequeue (); + + // Only one thread will do recv at the same time. // FUZZ: disable check_for_ACE_Guard - // Only one thread will do recv. ACE_Guard recv_guard (this->recv_lock_, 0); // tryacquire - if (!recv_guard.locked ()) - return !this->complete_.is_empty (); // FUZZ: enable check_for_ACE_Guard - - // The buffer on the stack which will be used to hold the input - // messages. - char buf [MIOP_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT]; - char *aligned_buf = ACE_ptr_align_binary (buf, ACE_CDR::MAX_ALIGNMENT); + if (recv_guard.locked ()) + { + // The buffer on the stack which will be used to hold the input + // messages. + char buf [MIOP_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT]; + char *aligned_buf = ACE_ptr_align_binary (buf, ACE_CDR::MAX_ALIGNMENT); #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) - (void) ACE_OS::memset (buf, - '\0', - sizeof buf); + (void) ACE_OS::memset (buf, '\0', sizeof buf); #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ - while (true) - { - // This guard will cleanup expired packets each iteration. - TAO_PG::UIPMC_Recv_Packet_Cleanup_Guard guard (this); + while (true) + { + // This guard will cleanup expired packets each iteration. + TAO_PG::UIPMC_Recv_Packet_Cleanup_Guard guard (this); - ACE_INET_Addr from_addr; - CORBA::UShort packet_length; - CORBA::ULong packet_number; - bool stop_packet; - u_long id_hash; + ACE_INET_Addr from_addr; + CORBA::UShort packet_length; + CORBA::ULong packet_number; + bool stop_packet; + u_long id_hash; - char *start_data = - this->recv_packet (aligned_buf, MIOP_MAX_DGRAM_SIZE, from_addr, - packet_length, packet_number, stop_packet, id_hash); + char *start_data = + this->recv_packet (aligned_buf, MIOP_MAX_DGRAM_SIZE, from_addr, + packet_length, packet_number, stop_packet, id_hash); - // The socket buffer is empty. Try to do other useful things. - if (start_data == 0) - { - if (errno != EWOULDBLOCK && errno != EAGAIN) + // The socket buffer is empty. Try to do other useful things. + if (start_data == 0) { + if (errno != EWOULDBLOCK && errno != EAGAIN) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_all, unexpected failure of recv_packet '%m'\n"), + this->id ())); + } + break; + } + + if (TAO_debug_level >= 9) + { + char tmp[INET6_ADDRSTRLEN]; + from_addr.get_host_addr (tmp, sizeof tmp); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") - ACE_TEXT ("recv_all, unexpected failure of recv_packet '%m'\n"), - this->id ())); + ACE_TEXT ("recv, received %d bytes from <%C:%u> ") + ACE_TEXT ("(hash %d)\n"), + this->id (), + packet_length, + tmp, + from_addr.get_port_number (), + id_hash)); } - break; - } - if (TAO_debug_level >= 10) - { - char tmp[INET6_ADDRSTRLEN]; - from_addr.get_host_addr (tmp, sizeof tmp); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") - ACE_TEXT ("recv, received %d bytes from <%C:%u> ") - ACE_TEXT ("(hash %d)\n"), - this->id (), - packet_length, - tmp, - from_addr.get_port_number (), - id_hash)); - } + TAO_PG::UIPMC_Recv_Packet *packet = 0; + if (this->incomplete_.find (id_hash, packet) == -1) + { + ACE_NEW_THROW_EX (packet, + TAO_PG::UIPMC_Recv_Packet, + CORBA::NO_MEMORY ( + CORBA::SystemException::_tao_minor_code ( + TAO::VMCID, + ENOMEM), + CORBA::COMPLETED_NO)); + + if (this->incomplete_.bind (id_hash, packet) != 0) + { + // Cleanup the packet. + delete packet; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_all, could not queue fragment\n"), + this->id ())); + continue; + } + } - TAO_PG::UIPMC_Recv_Packet *packet = 0; - if (this->incomplete_.find (id_hash, packet) == -1) - { - ACE_NEW_THROW_EX (packet, - TAO_PG::UIPMC_Recv_Packet, - CORBA::NO_MEMORY ( - CORBA::SystemException::_tao_minor_code ( - TAO::VMCID, - ENOMEM), - CORBA::COMPLETED_NO)); - - if (this->incomplete_.bind (id_hash, packet) != 0) + // We have incomplete packet so add a new data to it. + // add_fragment returns 1 iff the packet is complete. + if (1 == packet->add_fragment (start_data, packet_length, + packet_number, stop_packet)) { - // Cleanup the packet. - ACE_Auto_Ptr bail_guard (packet); - continue; + // Remove this packet from incomplete packets. + this->incomplete_.unbind (id_hash); + + // If there are no completed message ahead of us AND + // we only want a single message, just return it. + if (this->complete_.is_empty () && !eager_dequeue) + { + if (TAO_debug_level >= 9) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_all, completed MIOP message %@\n"), + this->id (), static_cast (packet))); + } + + return packet; + } + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, + guard, + this->complete_lock_, + packet); + if (this->complete_.is_empty () && !eager_dequeue) + { + // Another thread dequeued the waiting MIOP message before we got + // the lock, simply return our single message, don't bother queueing + // it after all. + if (TAO_debug_level >= 9) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_all, completed MIOP message %@\n"), + this->id (), static_cast (packet))); + } + + return packet; + } + + if (TAO_debug_level >= 9) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_all, completed MIOP message %@ (QUEUED)\n"), + this->id (), static_cast (packet))); + } + + // Add it to the complete queue. + this->complete_.enqueue_tail (packet); + + // Stop attempting to queue more messages if we are not in eager mode. + if (!eager_dequeue) + break; } } + recv_guard.release (); + } - // We have incomplete packet so add a new data to it. - int const ret = packet->add_fragment (start_data, packet_length, - packet_number, stop_packet); - - // add_fragment returns 1 iff the packet is complete. - if (ret == 1) - { - // Remove this packet from incomplete packets. - this->incomplete_.unbind (id_hash); + // Ok we have received as many packets as we could, now if we have + // any completed packets queued up, return the first to the caller. + if (this->complete_.is_empty ()) + return 0; + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, complete_guard, this->complete_lock_, 0); + if (this->complete_.is_empty ()) + return 0; // Another thread got here first, not a problem. - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, - guard, - this->complete_lock_, - !this->complete_.is_empty ()); + TAO_PG::UIPMC_Recv_Packet *packet = 0; + if (this->complete_.dequeue_head (packet) == -1) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_UIPMC_Mcast_Transport[%d]::recv_all, ") + ACE_TEXT ("unable to dequeue completed message\n"), + this->id ())); + return 0; + } - // Add it to the complete queue. - this->complete_.enqueue_tail (packet); + if (TAO_debug_level >= 9) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_all, completed MIOP message %@ (DEQUEUED)\n"), + this->id (), static_cast (packet))); + } - // The following break stops the eager de-queuing of - // further completed MIOP messages. This should probably - // be configuable via a MIOP Server -ORB option. - break; + // If there is another message waiting to be processed (in addition + // to the one we have just taken off), notify another thread (if + // available) so this can also be processed in parrellel. + if (!this->complete_.is_empty ()) + { + int const retval = this->notify_reactor_now (); + if (retval == 1) + { + // Now we have handed off to another thread, let the class + // know that it doesn't need to resume with OUR handle + // after we have processed our message. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + } + else if (retval < 0 && TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_UIPMC_Mcast_Transport[%d]::recv_all, ") + ACE_TEXT ("notify to the reactor failed.\n"), + this->id ())); } } - return !this->complete_.is_empty (); + return packet; } int @@ -411,47 +502,17 @@ TAO_UIPMC_Mcast_Transport::handle_input ( this->id ())); } - if (this->recv_all ()) + // Grab the next completed MIOP message to process from the FIFO Queue. + ACE_Auto_Ptr complete_owner (this->recv_all (rh)); + if (TAO_PG::UIPMC_Recv_Packet *complete = complete_owner.get ()) { - // Unqueue the first available completed message for us to process. - TAO_PG::UIPMC_Recv_Packet *complete = 0; - ACE_Auto_Ptr owner (0); - { - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->complete_lock_, 0); - if (this->complete_.is_empty ()) - return 0; // Another thread got here first, no problem. - if (this->complete_.dequeue_head (complete) == -1) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - TAO_UIPMC_Mcast_Transport[%d]::handle_input, ") - ACE_TEXT ("unable to dequeue completed message\n"), - this->id ())); - return 0; - } - ACE_auto_ptr_reset (owner, complete); - - // If there is another message waiting to be processed (in addition - // to the one we have just taken off to be processed), notify another - // thread (if available) so this can also be processed in parrellel. - if (!this->complete_.is_empty ()) - { - int const retval = this->notify_reactor_now (); - if (retval == 1) - { - // Now we have handed off to another thread, let the class - // know that it doesn't need to resume with OUR handle - // after we have processed our message. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); - } - else if (retval < 0 && TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - TAO_UIPMC_Mcast_Transport[%d]::handle_input, ") - ACE_TEXT ("notify to the reactor failed.\n"), - this->id ())); - } - } - } + if (TAO_debug_level >= 9) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("handle_input, processing MIOP message %@ (%d bytes)\n"), + this->id (), static_cast (complete), complete->data_length ())); + } // Create a data block from our dequeued completed message. char *buffer= 0; diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h index 406fa8a84c9..75befb1e41d 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h @@ -31,7 +31,6 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL // Forward decls. class TAO_ORB_Core; class TAO_UIPMC_Mcast_Connection_Handler; - namespace TAO_PG { class UIPMC_Recv_Packet_Cleanup_Guard; @@ -118,8 +117,9 @@ private: bool &stop_packet, u_long &id_hash) const; - /// Receive as much UDP packets as possible. - bool recv_all (void); + /// Return the next complete MIOP packet, possiably dequeueing + /// as many as are available first from the socket. + TAO_PG::UIPMC_Recv_Packet *recv_all (TAO_Resume_Handle &rh); /// Cleanup either all packets or expired only depending the /// expired_only flag. diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp index 9c9e9f2b07c..2b986c4afd9 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp @@ -330,10 +330,11 @@ TAO_UIPMC_Transport::send ( this_fragment_size-= static_cast (already_sent)) { // Make sure we don't send our fragments too quickly - this->throttle_send_rate ( - factory->max_fragment_rate (), - max_fragment_size, - this_fragment_size); + if (factory->enable_throttling ()) + this->throttle_send_rate ( + factory->max_fragment_rate (), + max_fragment_size, + this_fragment_size); // Haven't sent some of the data yet, we need to adjust the fragments iov's // to skip the data we have actually manage to send so far. @@ -382,7 +383,8 @@ TAO_UIPMC_Transport::send ( } // Keep a note of the number of bytes we have just buffered - this->total_bytes_outstanding_+= static_cast (already_sent); + if (factory->enable_throttling ()) + this->total_bytes_outstanding_+= static_cast (already_sent); } // Keep sending the rest of the fragment // Increment the number of bytes of payload transferred. diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp index f61ff35a2f3..86002c5f9b2 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp @@ -20,6 +20,8 @@ TAO_MIOP_Resource_Factory::TAO_MIOP_Resource_Factory (void) , send_hi_water_mark_ (0u) // Zero sets this to actual -ORBSndSock , send_buffer_size_ (0u) // Zero is unspecified (-ORBSndSock). , receive_buffer_size_ (0u) // Zero is unspecified (-ORBRcvSock). + , enable_throttling_ (!!(TAO_DEFAULT_MIOP_SEND_THROTTLING)) // Client-side SendRate throttling enabled. + , enable_eager_dequeue_ (!!(TAO_DEFAULT_MIOP_EAGER_DEQUEUEING)) // Server-side Multiple message dequeueing. { } @@ -342,6 +344,18 @@ TAO_MIOP_Resource_Factory::receive_buffer_size (void) const return receive_buffer_size_; } +bool +TAO_MIOP_Resource_Factory::enable_throttling (void) const +{ + return enable_throttling_; +} + +bool +TAO_MIOP_Resource_Factory::enable_eager_dequeue (void) const +{ + return enable_eager_dequeue_; +} + TAO_END_VERSIONED_NAMESPACE_DECL // **************************************************************** diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h index a4cfac1b51d..8f45618c04d 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h @@ -90,8 +90,13 @@ public: u_long receive_buffer_size (void) const; //@} -private: + /// Get the client-side transmission rate throttling enable flag. + bool enable_throttling (void) const; + + /// Get the server-side eager complete message dequeuing enable flag. + bool enable_eager_dequeue (void) const; +private: enum Fragments_Cleanup_Strategy_Type { TAO_MIOP_CLEANUP_TIME_BOUND, @@ -122,6 +127,12 @@ private: /// Get the desired socket receive buffer's size in bytes. u_long receive_buffer_size_; + + /// Get the client-side transmission rate throttling enable flag. + bool enable_throttling_; + + /// Get the server-side eager complete message dequeuing enable flag. + bool enable_eager_dequeue_; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h b/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h index b45947b340a..49afdf15567 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h @@ -98,6 +98,14 @@ static u_long const TAO_DEFAULT_MIOP_FRAGMENT_SIZE = MIOP_MAX_DGRAM_SIZE; static u_long const TAO_DEFAULT_MIOP_MAX_FRAGMENTS = 0u; // Zero is unlimited #endif +#if !defined (TAO_DEFAULT_MIOP_SEND_THROTTLING) +static bool const TAO_DEFAULT_MIOP_SEND_THROTTLING = true; // Enabled +#endif + +#if !defined (TAO_DEFAULT_MIOP_EAGER_DEQUEUEING) +static bool const TAO_DEFAULT_MIOP_EAGER_DEQUEUEING = true; // Enabled +#endif + static CORBA::Octet const miop_magic[4] = { 0x4d, 0x49, 0x4f, 0x50 }; // in ASCII this is 'M', 'I', 'O', 'P' -- cgit v1.2.1