summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/PortableGroup
diff options
context:
space:
mode:
authorsma <sma@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2013-01-24 15:52:45 +0000
committersma <sma@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2013-01-24 15:52:45 +0000
commit3cda7554ccedda0c19f9e4fca0bcf3d61e6dc871 (patch)
tree667c659207125a6aaab4cc7dce105d85b3ff8409 /TAO/orbsvcs/orbsvcs/PortableGroup
parent5c422b232d0a6a80530bc65ce88fc0910b370d7d (diff)
downloadATCD-3cda7554ccedda0c19f9e4fca0bcf3d61e6dc871.tar.gz
Thu Jan 24 15:51:00 UTC 2013 Simon Massey <simon dot massey at prismtech dot com>
* NEWS: * doc/Options.h: * orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp: * orbsvcs/orbsvcs/PortableGroup/miop_resource.h: * orbsvcs/orbsvcs/PortableGroup/miopconf.h: * orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp: * orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h: * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp: Added MIOP configuration options -ORBSendThrottling and -ORBEagerDequeueing, along with #define overrides for their default settings. See the descriptions in the MIOP section of doc/Options.html for their use.
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/PortableGroup')
-rw-r--r--TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp305
-rw-r--r--TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp12
-rw-r--r--TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp14
-rw-r--r--TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h13
-rw-r--r--TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h8
6 files changed, 227 insertions, 131 deletions
diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp
index 6e0b8762556..22459eb5b44 100644
--- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp
+++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp
@@ -284,115 +284,206 @@ TAO_UIPMC_Mcast_Transport::recv_packet (
return &buf[miop_header_size];
}
-bool
-TAO_UIPMC_Mcast_Transport::recv_all (void)
+TAO_PG::UIPMC_Recv_Packet *
+TAO_UIPMC_Mcast_Transport::recv_all (TAO_Resume_Handle &rh)
{
+ const TAO_MIOP_Resource_Factory *const factory =
+ ACE_Dynamic_Service<TAO_MIOP_Resource_Factory>::instance (
+ this->orb_core_->configuration(),
+ ACE_TEXT ("MIOP_Resource_Factory"));
+ const bool eager_dequeue= factory->enable_eager_dequeue ();
+
+ // Only one thread will do recv at the same time.
// FUZZ: disable check_for_ACE_Guard
- // Only one thread will do recv.
ACE_Guard<TAO_SYNCH_MUTEX> recv_guard (this->recv_lock_, 0); // tryacquire
- if (!recv_guard.locked ())
- return !this->complete_.is_empty ();
// FUZZ: enable check_for_ACE_Guard
-
- // The buffer on the stack which will be used to hold the input
- // messages.
- char buf [MIOP_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
- char *aligned_buf = ACE_ptr_align_binary (buf, ACE_CDR::MAX_ALIGNMENT);
+ if (recv_guard.locked ())
+ {
+ // The buffer on the stack which will be used to hold the input
+ // messages.
+ char buf [MIOP_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
+ char *aligned_buf = ACE_ptr_align_binary (buf, ACE_CDR::MAX_ALIGNMENT);
#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
- (void) ACE_OS::memset (buf,
- '\0',
- sizeof buf);
+ (void) ACE_OS::memset (buf, '\0', sizeof buf);
#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
- while (true)
- {
- // This guard will cleanup expired packets each iteration.
- TAO_PG::UIPMC_Recv_Packet_Cleanup_Guard guard (this);
+ while (true)
+ {
+ // This guard will cleanup expired packets each iteration.
+ TAO_PG::UIPMC_Recv_Packet_Cleanup_Guard guard (this);
- ACE_INET_Addr from_addr;
- CORBA::UShort packet_length;
- CORBA::ULong packet_number;
- bool stop_packet;
- u_long id_hash;
+ ACE_INET_Addr from_addr;
+ CORBA::UShort packet_length;
+ CORBA::ULong packet_number;
+ bool stop_packet;
+ u_long id_hash;
- char *start_data =
- this->recv_packet (aligned_buf, MIOP_MAX_DGRAM_SIZE, from_addr,
- packet_length, packet_number, stop_packet, id_hash);
+ char *start_data =
+ this->recv_packet (aligned_buf, MIOP_MAX_DGRAM_SIZE, from_addr,
+ packet_length, packet_number, stop_packet, id_hash);
- // The socket buffer is empty. Try to do other useful things.
- if (start_data == 0)
- {
- if (errno != EWOULDBLOCK && errno != EAGAIN)
+ // The socket buffer is empty. Try to do other useful things.
+ if (start_data == 0)
{
+ if (errno != EWOULDBLOCK && errno != EAGAIN)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
+ ACE_TEXT ("recv_all, unexpected failure of recv_packet '%m'\n"),
+ this->id ()));
+ }
+ break;
+ }
+
+ if (TAO_debug_level >= 9)
+ {
+ char tmp[INET6_ADDRSTRLEN];
+ from_addr.get_host_addr (tmp, sizeof tmp);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
- ACE_TEXT ("recv_all, unexpected failure of recv_packet '%m'\n"),
- this->id ()));
+ ACE_TEXT ("recv, received %d bytes from <%C:%u> ")
+ ACE_TEXT ("(hash %d)\n"),
+ this->id (),
+ packet_length,
+ tmp,
+ from_addr.get_port_number (),
+ id_hash));
}
- break;
- }
- if (TAO_debug_level >= 10)
- {
- char tmp[INET6_ADDRSTRLEN];
- from_addr.get_host_addr (tmp, sizeof tmp);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
- ACE_TEXT ("recv, received %d bytes from <%C:%u> ")
- ACE_TEXT ("(hash %d)\n"),
- this->id (),
- packet_length,
- tmp,
- from_addr.get_port_number (),
- id_hash));
- }
+ TAO_PG::UIPMC_Recv_Packet *packet = 0;
+ if (this->incomplete_.find (id_hash, packet) == -1)
+ {
+ ACE_NEW_THROW_EX (packet,
+ TAO_PG::UIPMC_Recv_Packet,
+ CORBA::NO_MEMORY (
+ CORBA::SystemException::_tao_minor_code (
+ TAO::VMCID,
+ ENOMEM),
+ CORBA::COMPLETED_NO));
+
+ if (this->incomplete_.bind (id_hash, packet) != 0)
+ {
+ // Cleanup the packet.
+ delete packet;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
+ ACE_TEXT ("recv_all, could not queue fragment\n"),
+ this->id ()));
+ continue;
+ }
+ }
- TAO_PG::UIPMC_Recv_Packet *packet = 0;
- if (this->incomplete_.find (id_hash, packet) == -1)
- {
- ACE_NEW_THROW_EX (packet,
- TAO_PG::UIPMC_Recv_Packet,
- CORBA::NO_MEMORY (
- CORBA::SystemException::_tao_minor_code (
- TAO::VMCID,
- ENOMEM),
- CORBA::COMPLETED_NO));
-
- if (this->incomplete_.bind (id_hash, packet) != 0)
+ // We have incomplete packet so add a new data to it.
+ // add_fragment returns 1 iff the packet is complete.
+ if (1 == packet->add_fragment (start_data, packet_length,
+ packet_number, stop_packet))
{
- // Cleanup the packet.
- ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> bail_guard (packet);
- continue;
+ // Remove this packet from incomplete packets.
+ this->incomplete_.unbind (id_hash);
+
+ // If there are no completed message ahead of us AND
+ // we only want a single message, just return it.
+ if (this->complete_.is_empty () && !eager_dequeue)
+ {
+ if (TAO_debug_level >= 9)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
+ ACE_TEXT ("recv_all, completed MIOP message %@\n"),
+ this->id (), static_cast<void *> (packet)));
+ }
+
+ return packet;
+ }
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
+ guard,
+ this->complete_lock_,
+ packet);
+ if (this->complete_.is_empty () && !eager_dequeue)
+ {
+ // Another thread dequeued the waiting MIOP message before we got
+ // the lock, simply return our single message, don't bother queueing
+ // it after all.
+ if (TAO_debug_level >= 9)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
+ ACE_TEXT ("recv_all, completed MIOP message %@\n"),
+ this->id (), static_cast<void *> (packet)));
+ }
+
+ return packet;
+ }
+
+ if (TAO_debug_level >= 9)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
+ ACE_TEXT ("recv_all, completed MIOP message %@ (QUEUED)\n"),
+ this->id (), static_cast<void *> (packet)));
+ }
+
+ // Add it to the complete queue.
+ this->complete_.enqueue_tail (packet);
+
+ // Stop attempting to queue more messages if we are not in eager mode.
+ if (!eager_dequeue)
+ break;
}
}
+ recv_guard.release ();
+ }
- // We have incomplete packet so add a new data to it.
- int const ret = packet->add_fragment (start_data, packet_length,
- packet_number, stop_packet);
-
- // add_fragment returns 1 iff the packet is complete.
- if (ret == 1)
- {
- // Remove this packet from incomplete packets.
- this->incomplete_.unbind (id_hash);
+ // Ok we have received as many packets as we could, now if we have
+ // any completed packets queued up, return the first to the caller.
+ if (this->complete_.is_empty ())
+ return 0;
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, complete_guard, this->complete_lock_, 0);
+ if (this->complete_.is_empty ())
+ return 0; // Another thread got here first, not a problem.
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
- guard,
- this->complete_lock_,
- !this->complete_.is_empty ());
+ TAO_PG::UIPMC_Recv_Packet *packet = 0;
+ if (this->complete_.dequeue_head (packet) == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - TAO_UIPMC_Mcast_Transport[%d]::recv_all, ")
+ ACE_TEXT ("unable to dequeue completed message\n"),
+ this->id ()));
+ return 0;
+ }
- // Add it to the complete queue.
- this->complete_.enqueue_tail (packet);
+ if (TAO_debug_level >= 9)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
+ ACE_TEXT ("recv_all, completed MIOP message %@ (DEQUEUED)\n"),
+ this->id (), static_cast<void *> (packet)));
+ }
- // The following break stops the eager de-queuing of
- // further completed MIOP messages. This should probably
- // be configuable via a MIOP Server -ORB option.
- break;
+ // If there is another message waiting to be processed (in addition
+ // to the one we have just taken off), notify another thread (if
+ // available) so this can also be processed in parrellel.
+ if (!this->complete_.is_empty ())
+ {
+ int const retval = this->notify_reactor_now ();
+ if (retval == 1)
+ {
+ // Now we have handed off to another thread, let the class
+ // know that it doesn't need to resume with OUR handle
+ // after we have processed our message.
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
+ }
+ else if (retval < 0 && TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - TAO_UIPMC_Mcast_Transport[%d]::recv_all, ")
+ ACE_TEXT ("notify to the reactor failed.\n"),
+ this->id ()));
}
}
- return !this->complete_.is_empty ();
+ return packet;
}
int
@@ -411,47 +502,17 @@ TAO_UIPMC_Mcast_Transport::handle_input (
this->id ()));
}
- if (this->recv_all ())
+ // Grab the next completed MIOP message to process from the FIFO Queue.
+ ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> complete_owner (this->recv_all (rh));
+ if (TAO_PG::UIPMC_Recv_Packet *complete = complete_owner.get ())
{
- // Unqueue the first available completed message for us to process.
- TAO_PG::UIPMC_Recv_Packet *complete = 0;
- ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> owner (0);
- {
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->complete_lock_, 0);
- if (this->complete_.is_empty ())
- return 0; // Another thread got here first, no problem.
- if (this->complete_.dequeue_head (complete) == -1)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - TAO_UIPMC_Mcast_Transport[%d]::handle_input, ")
- ACE_TEXT ("unable to dequeue completed message\n"),
- this->id ()));
- return 0;
- }
- ACE_auto_ptr_reset (owner, complete);
-
- // If there is another message waiting to be processed (in addition
- // to the one we have just taken off to be processed), notify another
- // thread (if available) so this can also be processed in parrellel.
- if (!this->complete_.is_empty ())
- {
- int const retval = this->notify_reactor_now ();
- if (retval == 1)
- {
- // Now we have handed off to another thread, let the class
- // know that it doesn't need to resume with OUR handle
- // after we have processed our message.
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
- }
- else if (retval < 0 && TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - TAO_UIPMC_Mcast_Transport[%d]::handle_input, ")
- ACE_TEXT ("notify to the reactor failed.\n"),
- this->id ()));
- }
- }
- }
+ if (TAO_debug_level >= 9)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::")
+ ACE_TEXT ("handle_input, processing MIOP message %@ (%d bytes)\n"),
+ this->id (), static_cast<void *> (complete), complete->data_length ()));
+ }
// Create a data block from our dequeued completed message.
char *buffer= 0;
diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h
index 406fa8a84c9..75befb1e41d 100644
--- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h
+++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h
@@ -31,7 +31,6 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
// Forward decls.
class TAO_ORB_Core;
class TAO_UIPMC_Mcast_Connection_Handler;
-
namespace TAO_PG
{
class UIPMC_Recv_Packet_Cleanup_Guard;
@@ -118,8 +117,9 @@ private:
bool &stop_packet,
u_long &id_hash) const;
- /// Receive as much UDP packets as possible.
- bool recv_all (void);
+ /// Return the next complete MIOP packet, possiably dequeueing
+ /// as many as are available first from the socket.
+ TAO_PG::UIPMC_Recv_Packet *recv_all (TAO_Resume_Handle &rh);
/// Cleanup either all packets or expired only depending the
/// expired_only flag.
diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp
index 9c9e9f2b07c..2b986c4afd9 100644
--- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp
+++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp
@@ -330,10 +330,11 @@ TAO_UIPMC_Transport::send (
this_fragment_size-= static_cast<u_long> (already_sent))
{
// Make sure we don't send our fragments too quickly
- this->throttle_send_rate (
- factory->max_fragment_rate (),
- max_fragment_size,
- this_fragment_size);
+ if (factory->enable_throttling ())
+ this->throttle_send_rate (
+ factory->max_fragment_rate (),
+ max_fragment_size,
+ this_fragment_size);
// Haven't sent some of the data yet, we need to adjust the fragments iov's
// to skip the data we have actually manage to send so far.
@@ -382,7 +383,8 @@ TAO_UIPMC_Transport::send (
}
// Keep a note of the number of bytes we have just buffered
- this->total_bytes_outstanding_+= static_cast<u_long> (already_sent);
+ if (factory->enable_throttling ())
+ this->total_bytes_outstanding_+= static_cast<u_long> (already_sent);
} // Keep sending the rest of the fragment
// Increment the number of bytes of payload transferred.
diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp
index f61ff35a2f3..86002c5f9b2 100644
--- a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp
+++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp
@@ -20,6 +20,8 @@ TAO_MIOP_Resource_Factory::TAO_MIOP_Resource_Factory (void)
, send_hi_water_mark_ (0u) // Zero sets this to actual -ORBSndSock
, send_buffer_size_ (0u) // Zero is unspecified (-ORBSndSock).
, receive_buffer_size_ (0u) // Zero is unspecified (-ORBRcvSock).
+ , enable_throttling_ (!!(TAO_DEFAULT_MIOP_SEND_THROTTLING)) // Client-side SendRate throttling enabled.
+ , enable_eager_dequeue_ (!!(TAO_DEFAULT_MIOP_EAGER_DEQUEUEING)) // Server-side Multiple message dequeueing.
{
}
@@ -342,6 +344,18 @@ TAO_MIOP_Resource_Factory::receive_buffer_size (void) const
return receive_buffer_size_;
}
+bool
+TAO_MIOP_Resource_Factory::enable_throttling (void) const
+{
+ return enable_throttling_;
+}
+
+bool
+TAO_MIOP_Resource_Factory::enable_eager_dequeue (void) const
+{
+ return enable_eager_dequeue_;
+}
+
TAO_END_VERSIONED_NAMESPACE_DECL
// ****************************************************************
diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h
index a4cfac1b51d..8f45618c04d 100644
--- a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h
+++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h
@@ -90,8 +90,13 @@ public:
u_long receive_buffer_size (void) const;
//@}
-private:
+ /// Get the client-side transmission rate throttling enable flag.
+ bool enable_throttling (void) const;
+
+ /// Get the server-side eager complete message dequeuing enable flag.
+ bool enable_eager_dequeue (void) const;
+private:
enum Fragments_Cleanup_Strategy_Type
{
TAO_MIOP_CLEANUP_TIME_BOUND,
@@ -122,6 +127,12 @@ private:
/// Get the desired socket receive buffer's size in bytes.
u_long receive_buffer_size_;
+
+ /// Get the client-side transmission rate throttling enable flag.
+ bool enable_throttling_;
+
+ /// Get the server-side eager complete message dequeuing enable flag.
+ bool enable_eager_dequeue_;
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h b/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h
index b45947b340a..49afdf15567 100644
--- a/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h
+++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h
@@ -98,6 +98,14 @@ static u_long const TAO_DEFAULT_MIOP_FRAGMENT_SIZE = MIOP_MAX_DGRAM_SIZE;
static u_long const TAO_DEFAULT_MIOP_MAX_FRAGMENTS = 0u; // Zero is unlimited
#endif
+#if !defined (TAO_DEFAULT_MIOP_SEND_THROTTLING)
+static bool const TAO_DEFAULT_MIOP_SEND_THROTTLING = true; // Enabled
+#endif
+
+#if !defined (TAO_DEFAULT_MIOP_EAGER_DEQUEUEING)
+static bool const TAO_DEFAULT_MIOP_EAGER_DEQUEUEING = true; // Enabled
+#endif
+
static CORBA::Octet const miop_magic[4] = {
0x4d, 0x49, 0x4f, 0x50
}; // in ASCII this is 'M', 'I', 'O', 'P'