diff options
author | sma <sma@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2012-08-10 14:01:17 +0000 |
---|---|---|
committer | sma <sma@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2012-08-10 14:01:17 +0000 |
commit | 1c7f93a50ec5038a953063118ad4160f57aa1d0c (patch) | |
tree | c7ba4ecb80e8c6b31df78e61d57a651010898746 | |
parent | 5343533e453cc697555b4b7d08e5dfd3cc312034 (diff) | |
download | ATCD-1c7f93a50ec5038a953063118ad4160f57aa1d0c.tar.gz |
Fri Aug 10 14:00:00 UTC 2012 Simon Massey <sma at prismtech dot com> MIOP Fragmentation
47 files changed, 4102 insertions, 950 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 37404f291b5..851a1fe10b2 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,60 @@ +Fri Aug 10 14:00:00 UTC 2012 Simon Massey <sma at prismtech dot com> + + * NEWS: + * docs/Options.html: + * tao/Connection_Handler.cpp: + * orbsvcs/orbsvcs/PortableGroup/MIOP.cpp: + * orbsvcs/orbsvcs/PortableGroup/miopconf.h: + * orbsvcs/orbsvcs/PortableGroup/PortableGroup_Loader.cpp: + * orbsvcs/orbsvcs/PortableGroup/README: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Endpoint.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Factory.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp: + + Enhanced the MIOP protocol to support MIOP Fragmentation. + + * orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.h: + * orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.inl: + * orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.cpp: + * orbsvcs/orbsvcs/PortableGroup/miop_resource.h: + * orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.h: + * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.cpp: + + New files added to support MIOP Fragmentation. + + * bin/tao_other_tests.lst: + * orbsvcs/tests/Miop/McastFragmentation/client.cpp: + * orbsvcs/tests/Miop/McastFragmentation/Hello_Impl.h: + * orbsvcs/tests/Miop/McastFragmentation/Hello_Impl.cpp: + * orbsvcs/tests/Miop/McastFragmentation/Hello.idl: + * orbsvcs/tests/Miop/McastFragmentation/McastFragmentation.mpc: + * orbsvcs/tests/Miop/McastFragmentation/README: + * orbsvcs/tests/Miop/McastFragmentation/run_test.pl: + * orbsvcs/tests/Miop/McastFragmentation/run_test_ipv6.pl: + * orbsvcs/tests/Miop/McastFragmentation/server.cpp: + * orbsvcs/tests/Miop/McastFragmentation/uipmc_client.conf: + * orbsvcs/tests/Miop/McastFragmentation/uipmc_server.conf: + * orbsvcs/tests/Miop/McastFragmentation/uipmc_server_m.conf: + * orbsvcs/tests/Miop/McastFragmentation/uipmc_server_n.conf: + + Added a test for MIOP Fragmentation. + Fri Aug 10 08:30:00 UTC 2012 Simon Massey <sma at prismtech dot com> * orbsvcs/FaultTolerance/FT_Invocation_Endpoint_Selectors.cpp: @@ -28,6 +28,151 @@ USER VISIBLE CHANGES BETWEEN TAO-2.1.3 and TAO-2.1.4 observed to be required for the UIPMCAcceptor to open on the most recent Linux distribs. +. Enhanced the MIOP implementation; this protocol now supports MIOP message + fragmentation which is controlled via the new MIOP_Strategy_Factory. + This factory is located in the TAO_PortableGroup library and can be loaded + dynamically using a service configurator directive of the form: + + dynamic MIOP_Resource_Factory Service_Object * TAO_PortableGroup:_make_TAO_MIOP_Resource_Factory () "" + + Normally however, in order to setup the MIOP implementation correctly, the + application will have to use additional service configurator directives along + with the new MIOP_Strategy_Factory (such as for example): + + dynamic UIPMC_Factory Service_Object * TAO_PortableGroup:_make_TAO_UIPMC_Protocol_Factory() "" + static Resource_Factory "-ORBProtocolFactory IIOP_Factory -ORBProtocolFactory UIPMC_Factory" + dynamic PortableGroup_Loader Service_Object * TAO_PortableGroup:_make_TAO_PortableGroup_Loader() "" + dynamic MIOP_Resource_Factory Service_Object * TAO_PortableGroup:_make_TAO_MIOP_Resource_Factory () "" + + Any options required to be given to the MIOP_Resource_Factory should be + specified between the two double-quotes shown above as a space seporated list; + however none are required as all options take default values if not specified. + Since MIOP uses UDP sockets (which are not a "reliable" two-way transport) it + is easy to configure MIOP in such a way that messages will not actually reach + the targeted servant. For this reason the MIOP_Resource_Factory understands + several options which if used with care can maximize MIOP reliability. Users + of MIOP must understand that messages are broadcast in possibly multiple + individual network data packets and they all have to be reassembled by the + server in their entirety to be useable by the servant. If even a single data + fragment/packet is lost, the whole message cannot be reconstructed and will be + unusable. There is no way for the servant to even know it has missed such a + MIOP message, and being a one-way protocol, neither will the client be aware that + the message has been lost. Fragments can be lost due to a variety of reasons: + + 1) The client sends too large a message fragment, or sends messages too fast, + overwhelming the client socket's transmission buffer. (In which case the + client-side OS simply ignores the excess send requests and some of the + fragments are not actually sent on the wire.) + 2) The server socket's receive buffer became too full to aquire the fragments + off the wire. (In this case a single client is, or multiple clients are, + again sending messages too fast, but this time it is the server/servant + that is too slow to process the messages it has already received.) + 3) Something happend to the physical network or the data routing between the + two systems and the packet was lost between the client and the server. + + In the first two cases above, the loss can be mitigated by the client adapting + (or throttling back) its sending rate so as to not overflow the capacity of the + clients operating system sending buffer and the corresponding receiving buffer + of the server socket. If there are multiple clients all sending to the same + server, all of these clients must be configured to slow down their sending rate + as otherwise the server's receive buffer will become swamped and some messages + will inevitably be lost. + + The MIOP_Strategy_Factory accepts the following options: + + -ORBMaxFragments limit + This is a client-side option used to limit the maximum number of fragments + that a client can break the outgoing message up into. The limit must be a + positive number or zero (indicating an unlimited number of fragments may be + used). The default is normally 0 (i.e. unlimited), but this default can be + overriden when the TAO libraries are built in the ace/config.h, by specifying + the new default limit such as #define TAO_DEFAULT_MIOP_MAX_FRAGMENTS 1 + which in this case would turn off fragmentation by default (as only a single + fragment would be allowed), unless a new limit is specified in the service + file. Any messages that are too large and require more fragments than allowed + by this setting are simply not transmitted by the client (the message is + effectly lost without any error indication). This setting can be used as a + safty setting to stop swamping the network and servant with abnormally large + messages or during testing to "loose" large messages for whatever reason. + + -ORBMaxFragmentSize bytes + Another client-side option used to limit the number of bytes in each + individual MIOP fragment, which must be between 272 and ACE_MAX_UDP_PACKET_SIZE + (normally 65507) bytes inclusive. Smaller values increase (and larger values + decrease) the number of fragments required to send the actual payload data. + Each fragment requires the overhead of a new MIOP header (32 bytes) prefixing + the actual data being sent, with the header reducing the actual usable payload + data inside each fragment. Roughly this value can be considered as the MTU + (Maximum Transmission Unit) set for the specific connection. The default takes + the same value as the ACE_MAX_UDP_PACKET_SIZE for the system, but this default + can be overridden in the ace/config.h by giving a new value for + #define TAO_DEFAULT_MIOP_FRAGMENT_SIZE 65507 when the TAO libraries are built. + Note: most gateways and routing networks define an MTU of around 1460 bytes, it + is therefore advisable to specify the correct -ORBMaxFragmentSize value or smaller + otherwise some fragments may be lost. See also the -ORBSndSock option below. + + -ORBMaxFragmentRate microseconds + This client-side option specifies a non-zero, positive amount of time (in uSec, + i.e. microseconds) that it takes to transmit and process an individual message + fragment of the maximum size. (This is the total of the client->server->servant + processing time that is expected.) The client attempts to keep the rate at which + it sends messages down to this speed by purposly delay sending individual message + fragments, if it would exceeding this capacity (but also see the other option + -ORBSendHighWaterMark below). The default is same value as was specified, or is + defaulted by, the -ORBMaxFragmentSize option, but this time teated as microseconds + not bytes. (This default delay value is approximatly the time required to transmit + that number of bytes on a 10Base-T network.) Larger values will throttle back the + clients transmittion rate, smaller values will speed it up. + + -ORBSendHighWaterMark bytes + This client-side option is the usable size of the message buffer, i.e how much + data can be sent without introducing delays into the transmition of individual + MIOP fragments (due to the -ORBMaxFragmentRate option above). Its default value is + the size of the buffer maintained by the Operating System for the sending socket + at the client side (see the -ORBSndSock and -ORBRcvSock options below). + + -ORBSndSock bytes + This client-side option is the size of the outgoing socket's message buffer. If + specified in the service configuration file, this value will override (for MIOP + only) the value specified by the corresponding ORB_init parameter. If unspecified + in either place, the default value for the system itself will be used. + NOTE: the -ORBSndSock size normally limits the maximum size of an individual + message fragment, larger fragments can be simply ignored (i.e. packet loss) by the + client's socket without any error indication; this value and the -ORBMaxFragmentSize + should be set with care. For a linux type OS, the systems SndSock is usually defaulted + to about 65535 bytes, but whatever value is actually specified it is normally doubled + internally to take account of the control structures required to track the messages + themselves and so it is not a hard limit. + + -ORBRcvSock bytes + This server-side option is the size of the incoming socket's message buffer, i.e + how much data can be received directly off the wire and queued for processing by + the server whilst the servant is busy. If specified in the service configuration file, + this value will override (for MIOP only) the value specified by the corresponding + ORB_init parameter. If unspecified in either place, the default value for the system + itself will be used. Again for a linux type OS, the systems RcvSock is usually about + 65535 bytes, but whatever value is actually specified it is normally doubled to take + account of the control structures required to track the messages themselves and so it + is not a hard limit. Specifying as large a value as possible with the MIOP -ORBRcvSock + option is advisable to maximize the available sockets receive buffer space. + + -ORBFragmentsCleanupStrategy DELAY | NUMBER | MEMORY + This option is used on the server to specify the incomplete fragments cleanup + strategy. The default is DELAY, indicating that the fragments that cannot be + reassembled after a certain delay should be removed from the waiting queue (i.e. + considered lost messages). The other options mean that the total number of + messages in the waiting queue will be limited, or that the whole memory comsumed + by the incomplete messages in the waiting queue will be limited. + + -ORBFragmentsCleanupBound limit + This option specifies the numerical limit for the server's option above. If the + strategy is DELAY, the value indicates the delay in milliseconds (defaulting to + 1000 milliseconds i.e. 1 second if -ORBFragmentsCleanupBound is not specified). + If the strategy is NUMBER, the limit indicates the number of non-reassembled + messages in the queue (defaulting to 5 messages). If the strategy is MEMORY, + the limit indicates the number of bytes reserved for the whole queue (with the + default being 3000000 bytes). + USER VISIBLE CHANGES BETWEEN TAO-2.1.2 and TAO-2.1.3 ==================================================== diff --git a/TAO/bin/tao_other_tests.lst b/TAO/bin/tao_other_tests.lst index b0d68306f6c..eba1a575396 100644 --- a/TAO/bin/tao_other_tests.lst +++ b/TAO/bin/tao_other_tests.lst @@ -190,6 +190,8 @@ TAO/orbsvcs/tests/Miop/McastHello/run_test.pl: !MINIMUM !CORBA_E_COMPACT !CORBA_ TAO/orbsvcs/tests/Miop/McastZIOP/run_test.pl: !MINIMUM !CORBA_E_COMPACT !CORBA_E_MICRO !STATIC !NO_MCAST ZLIB TAO/orbsvcs/tests/Miop/McastPreferredInterfaces/run_test.pl: !MINIMUM !CORBA_E_COMPACT !CORBA_E_MICRO !STATIC !NO_MCAST TAO/orbsvcs/tests/Miop/McastPreferredInterfaces/run_test_ipv6.pl: !MINIMUM !CORBA_E_COMPACT !CORBA_E_MICRO !STATIC !NO_MCAST IPV6 +TAO/orbsvcs/tests/Miop/McastFragmentation/run_test.pl: !MINIMUM !CORBA_E_COMPACT !CORBA_E_MICRO !STATIC !NO_MCAST +TAO/orbsvcs/tests/Miop/McastFragmentation/run_test_ipv6.pl: IPV6 !MINIMUM !CORBA_E_COMPACT !CORBA_E_MICRO !STATIC !NO_MCAST # The following 2 tests use dynamic loading to change the default reactor on Windows !VxWorks !VxWorks_RTP !LabVIEW_RT TAO/orbsvcs/tests/LoadBalancing/GenericFactory/Application_Controlled/run_test.pl: !MINIMUM !CORBA_E_COMPACT !CORBA_E_MICRO !DISABLE_INTERCEPTORS !STATIC !ACE_FOR_TAO !LynxOS !ST TAO/orbsvcs/tests/LoadBalancing/GenericFactory/Infrastructure_Controlled/run_test.pl: !MINIMUM !CORBA_E_COMPACT !CORBA_E_MICRO !DISABLE_INTERCEPTORS !STATIC !ACE_FOR_TAO !LynxOS diff --git a/TAO/docs/Options.html b/TAO/docs/Options.html index ed99bd25556..46688442ed9 100644 --- a/TAO/docs/Options.html +++ b/TAO/docs/Options.html @@ -36,6 +36,7 @@ Selection </a> </li> </li> <li><a href="#TSSF">Server_Strategy_Factory </a> </li> <li><a href="#TCSF">Client_Strategy_Factory </a></li> + <li><a href="#TMSF">MIOP_Strategy_Factory </a></li> </ol> </li> </ul> @@ -705,8 +706,8 @@ is <code>0</code>. This option is disabled (<code>0</code>) by default.</td> specifies the third lane of that thread pool. Note that <code>0</code> should be used for the lane when specifying endpoints for thread pools without lanes. <code>*:*</code> - can be used to specify all pools and lanes, <code>1:*</code> means - all lanes from pool <code>1</code>, and <code>*:1</code> + can be used to specify all pools and lanes, <code>1:*</code> means + all lanes from pool <code>1</code>, and <code>*:1</code> means lane <code>1</code> from all pools. <p>Sets of endpoints may be specified using multiple @@ -1003,6 +1004,11 @@ creating strategies useful for clients such as request multiplexing strategies, wait strategies, connect strategies etc. <p></p> </li> + <li> <a href="#TMSF">MIOP Strategy Factory.</a> This factory is +responsible for controlling the behavior of clients and servers that +use MIOP protocol. + <p></p> + </li> <li> <a href="#TTSM">Time Policy Strategy Manager.</a> This factory manages the TIME_POLICY strategy used by the ORB for timers and countdowns. @@ -1217,7 +1223,7 @@ until all the data is sent. <tr> <td><code>-ORBOutputCDRAllocator</code> <em>mmap|local_memory_pool</em></td> <td><a name="-ORBOutputCDRAllocator"></a>When the define - <code>TAO_USE_OUTPUT_CDR_MMAP_MEMORY_POOL</code> is set to 1 then always the mmap pool + <code>TAO_USE_OUTPUT_CDR_MMAP_MEMORY_POOL</code> is set to 1 then always the mmap pool will be used. </td> </tr> @@ -1226,9 +1232,9 @@ until all the data is sent. <td><a name="-ORBZeroCopyWrite"></a> Use a zero copy write protocol, which at this moment the only option is sendfile. If your platform does support sendfile but you don't want - that TAO uses it you can disable - sendfile in TAO by add the define <code>TAO_HAS_SENDFILE 0</code> - to your config.h file. + that TAO uses it you can disable + sendfile in TAO by add the define <code>TAO_HAS_SENDFILE 0</code> + to your config.h file. </td> </tr> </tbody> @@ -1607,7 +1613,7 @@ When this strategy is set to RW, then also the -ORBFlushingStrategy <tr> <td><code>-ORBConnectionHandlerCleanup</code> <em>0 | 1</em><br> </td> - <td><a name="-ORBConnectionHandlerCleanup"></a>Setting this + <td><a name="-ORBConnectionHandlerCleanup"></a>Setting this option to <em>1</em> lets the ORB know that connection handlers setup for sending messages need to be cleaned up when errors occur. This option has an effect only for @@ -1660,7 +1666,128 @@ strategy. </table> </p> </blockquote> -<h4><a name="TTSM">4. Time_Policy_Manager</a></h4> +<h4><a name="TMSF">4. MIOP_Strategy_Factory</a></h4> +This factory is located in the <code>TAO_PortableGroup</code> library and it +accepts the options below. This factory can be loaded dynamically +using a service configurator directive of the form (all on one line): + +<p><code>dynamic MIOP_Resource_Factory Service_Object * TAO_PortableGroup:_make_TAO_MIOP_Resource_Factory () ""</code></p> + +Normally in order to setup the MIOP implementation correctly, the application will +have to use other service configurator directives (for example): + +<p><code>dynamic UIPMC_Factory Service_Object * TAO_PortableGroup:_make_TAO_UIPMC_Protocol_Factory() ""<br> +static Resource_Factory "-ORBProtocolFactory IIOP_Factory -ORBProtocolFactory UIPMC_Factory"<br> +dynamic PortableGroup_Loader Service_Object * TAO_PortableGroup:_make_TAO_PortableGroup_Loader() ""</code></p> + +Since MIOP uses UDP sockets (which is not a "reliable" transport unlike tcp/ip) it is easy to configure MIOP +in such a way that messages will not actually reach the servant. The options below are +intended to maximize MIOP reliability but they must be used with care, users of MIOP +must understand that messages are sent in fragments and they have to be reassembled +by the server in their entirety to be useable by the servant. If a single fragment +is lost, the whole message cannot be reconstructed and will be unusable. +Fragments can be lost due to a variety of reasons:</p> +<li>The client sends too large a message fragment, or sends messages too fast, +overwhelming the client socket's transmission buffer. (In which case the client-side OS simply +ignores the excess send requests and some of the fragments are not actually sent on the wire.)</li> +<li>The server socket's receive buffer became too full to aquire the fragments off the wire. (In this +case the client is again sending messages too fast, but this time it is the server that is too slow +to process the messages it has already received.)</li> +<li>Something happend to the network or routing and the packet was lost between the client and the server.</li> +<p>In the first two cases above, the loss can be mitigated by the client adapting/throttling its sending +rate so as to not overflow the capacity of the clients operating system sending buffer and the receiving +buffer of the server socket. If there are multiple clients all sending to the same server, all of these +clients must be configured to slow down their sending rate as otherwise the server's receive buffer will +become swamped and some messages will be lost.</p> +<blockquote> + <p> + <table border="2" cellpadding="0" cellspacing="2"> + <tbody> + <tr> + <th>Option</th> + <th>Description</th> + </tr> + <tr> + <td><code>-ORBMaxFragments</code> <em>limit</em></td> + <td>This is a client-side option used to limit the maximum number of fragments that a client can break +the outgoing message up into. The limit must be a positive number or zero (indicating an unlimited number). +The default is normally <em>0 (i.e. unlimited)</em>, but this default can be overriden when the TAO libraries +are built in the <code>ace/config.h</code>, by specifying the new default limit such as +<code>#define TAO_DEFAULT_MIOP_MAX_FRAGMENTS 1</code> which in this case would turn off fragmentation by +default (as only a single fragment would be allowed), unless a new limit is specified in the service file. +Any messages that are too large and require more fragments than allowed by this setting are simply not +transmitted by the client (the message is effectly lost without any error indication). This setting can be +used as a safety setting to stop swamping the network and servant with abnormally large messages. + </td> + </tr> + <tr> + <td><code>-ORBMaxFragmentSize</code> <em>bytes</em></td> + <td>Another client-side option used to limit the number of bytes in each individual MIOP fragment, +which must be between <em>272</em> and <em>65507</em> bytes inclusive. Smaller values increase (and larger +values decrease) the number of fragments required to send the actual payload data. Each fragment requires the +overhead of a new MIOP header (32 bytes) prefixing the actual data being sent, with the header reducing +the actual usable payload data inside each fragment. Roughly this value can be considered as the MTU +(Maximum Transmission Unit) set for the specific connection. The default takes the same value as the +<code>ACE_MAX_UDP_PACKET_SIZE</code> for the system (normally <em>65507</em>), but this default can be overridden +in the <code>ace/config.h</code> by giving a new value for <code>#define TAO_DEFAULT_MIOP_FRAGMENT_SIZE 65507</code> +when the TAO libraries are built. <b>Note: most gateways and routing networks define an MTU of around <em>1460</em>, +it is therefore advisable to specify the correct <code>-ORBMaxFragmentSize</code> value otherwise some fragments +may be lost. See also the <code>-ORBSndSock</code> option below.</b> + </td> + </tr> + <tr> + <td><code>-ORBMaxFragmentRate</code> <em>microseconds</em></td> + <td>This client-side option specifies a non-zero, positive amount of time (in microseconds) that it takes +to transmit and process an individual message fragment of the maximum size. (This is the total of the client -> server +-> servant processing time.) The client attempts to keep the rate at which it sends messages down to this speed by +purposly delaying sending message fragments, if it would exceeding this capacity (but also see the +<code>-ORBSendHighWaterMark</code> option below). The default is same value as specified by the +<code>-ORBMaxFragmentSize</code> option but this time in microseconds, not bytes. (This default delay value is approximatly the time required +to transmit that number of bytes on a 10Base-T network.) Larger values will throttle back the clients transmittion +rate, smaller values will speed it up. + </td> + </tr> + <tr> + <td><code>-ORBSendHighWaterMark</code> <em>bytes</em></td> + <td>This client-side option is the usable size of the message buffer, i.e how much data can be sent without +introducing delays into the transmition of individual MIOP fragments (due to the <code>-ORBMaxFragmentRate</code> +option above). Its default value is the size of the buffer maintained by the Operating System for the sending socket +at the client side (see the <code>-ORBSndSock</code> and <code>-ORBRcvSock</code> options below).</b> + </td> + </tr> + <tr> + <td><code>-ORBSndSock</code> <em>bytes</em></td> + <td>This client-side option is the size of the outgoing socket's message buffer. If specified in the service configuration file, this value will override (for MIOP only) the value specified by the corresponding ORB_init parameter. If unspecified in either place, the default value for the system itself will be used. <b>NOTE: the <code>-ORBSndSock</code> size normally limits the maximum size of an individual message fragment, larger fragments can be simply ignored (i.e. packet loss) by the client's socket without any error indication; this value and the <code>-ORBMaxFragmentSize</code> should be set with care.</b> + </td> + </tr> + <tr> + <td><code>-ORBRcvSock</code> <em>bytes</em></td> + <td>This server-side option is the size of the incoming socket's message buffer, i.e how much data can be received directly off the wire and queued for processing by the servant whilst it is busy. If specified in the service configuration file, this value will override (for MIOP only) the value specified by the corresponding ORB_init parameter. If unspecified in either place, the default value for the system itself will be used. + </td> + </tr> + <tr> + <td><code>-ORBFragmentsCleanupStrategy</code> <em>DELAY | NUMBER | MEMORY</em></td> + <td>This option is used on the server to specify the incomplete fragments cleanup strategy. +The default is <em>DELAY</em>, indicating that the fragments that cannot be reassembled after a +certain delay should be removed from the waiting queue (i.e. considered lost messages). The other options are <em>NUMBER</em> and +<em>MEMORY</em>, which respectively mean the number of messages in the waiting queue will be limited, +or the whole memory comsumed by the incomplete messages in the waiting queue will be limited. + </td> + </tr> + <tr> + <td><code>-ORBFragmentsCleanupBound</code> <em>limit</em></td> + <td>This option specifies the numerical limit for the server's <code>-ORBFragmentsCleanupStrategy</code> +option. If the strategy is <em>DELAY</em>, the value indicates the delay in milliseconds +(defaulting to 1000 milliseconds i.e. 1 second). If the strategy is <em>NUMBER</em>, the limit indicates the +number of non-reassembled messages in the queue (defaulting to 5 messages). If the strategy is <em>MEMORY</em>, +the limit indicates the number of bytes reserved for the whole queue (with the default being 3000000 bytes). + </td> + </tr> + </tbody> + </table> + </p> +</blockquote> +<h4><a name="TTSM">5. Time_Policy_Manager</a></h4> The TIME_POLICY manager controls the actual TIME_POLICY strategy used for ORB timers and countdowns. TAO provides a default strategy manager called <code>Time_Policy_Manager</code>. diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.cpp new file mode 100644 index 00000000000..6616cfd978f --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.cpp @@ -0,0 +1,239 @@ +// $Id$ + +#include "orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.h" +#include "orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.h" + +#if !defined (__ACE_INLINE__) +# include "orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace +{ + typedef TAO_UIPMC_Mcast_Transport::Packets_Map::iterator HASH_MAP_ITER; + typedef TAO_UIPMC_Mcast_Transport::Packets_Map::value_type HASH_MAP_ENTRY; + typedef HASH_MAP_ENTRY** DESCRIPTOR_SET; + + // The two functions below are stolen from Transport_Cache_Manager. +#if !defined (ACE_LACKS_QSORT) + int + cpscmp (void const *a, void const *b) + { + HASH_MAP_ENTRY * const * left = + reinterpret_cast<HASH_MAP_ENTRY * const *> (a); + HASH_MAP_ENTRY * const * right = + reinterpret_cast<HASH_MAP_ENTRY * const *> (b); + + if ((*left)->int_id_->started () < (*right)->int_id_->started ()) + return -1; + + if ((*left)->int_id_->started () > (*right)->int_id_->started ()) + return 1; + + return 0; + } +#endif /* ACE_LACKS_QSORT */ + + void + sort_set (DESCRIPTOR_SET &entries, int current_size) + { +#if defined (ACE_LACKS_QSORT) + // Use insertion sort if we don't have qsort + for(int i = 1; i < current_size; ++i) + { + if (entries[i]->int_id_->started () < + entries[i - 1]->int_id_->started ()) + { + HASH_MAP_ENTRY* entry = entries[i]; + + for(int j = i; j > 0 && + entries[j - 1]->int_id_->started () > + entry->int_id_->started (); --j) + { + HASH_MAP_ENTRY* holder = entries[j]; + entries[j] = entries[j - 1]; + entries[j - 1] = holder; + } + } + } +#else + ACE_OS::qsort (entries, current_size, + sizeof (HASH_MAP_ENTRY *), + reinterpret_cast<ACE_COMPARE_FUNC> (cpscmp)); +#endif /* ACE_LACKS_QSORT */ + } + +} + +namespace TAO_PG +{ + + Fragments_Cleanup_Strategy::~Fragments_Cleanup_Strategy (void) + { + } + + void + Time_Bound_Fragments_Cleanup_Strategy::cleanup ( + TAO_UIPMC_Mcast_Transport::Packets_Map &packets + ) + { + for (HASH_MAP_ITER iter = packets.begin (); + iter != packets.end ();) + { + // Move forward iter because what it was pointing to could be + // unbound at the end of the loop leaving the iterator pointing + // to removed entry. + HASH_MAP_ITER cur_iter = iter++; + + // bound_ is in milliseconds. + ACE_Time_Value const delay (0, 1000 * this->bound_); + ACE_Time_Value const now = ACE_OS::gettimeofday (); + + if ((*cur_iter).item ()->started () != ACE_Time_Value::zero && + now <= (*cur_iter).item ()->started () + delay) + continue; + + if (TAO_debug_level >= 8) + { + ACE_TCHAR const *b = + (*cur_iter).item ()->started () == ACE_Time_Value::zero ? + ACE_TEXT ("broken ") : ACE_TEXT (""); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TBFCS::cleanup, ") + ACE_TEXT ("cleaning %s%d bytes (hash %d)\n"), + b, (*cur_iter).item ()->data_length (), + (*cur_iter).key ())); + } + + ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> guard ((*cur_iter).item ()); + packets.unbind (cur_iter); + } + } + + void + Number_Bound_Fragments_Cleanup_Strategy::cleanup ( + TAO_UIPMC_Mcast_Transport::Packets_Map &packets + ) + { + int const current_size = static_cast<int> (packets.current_size ()); + + // bound_ is a number of packets. + if (current_size <= this->bound_) + return; + + DESCRIPTOR_SET sorted_set; + ACE_NEW (sorted_set, HASH_MAP_ENTRY*[current_size]); + ACE_Auto_Array_Ptr<HASH_MAP_ENTRY*> owner (sorted_set); + + HASH_MAP_ITER iter = packets.begin (); + + for (int i = 0; i < current_size; ++i) + { + sorted_set[i] = &(*iter); + ++iter; + } + + // Sort in ascending order. + sort_set (sorted_set, current_size); + + // Since started() can return ACE_Time_Value::zero (lowest possible value) + // then those broken packets will in the beginning of the sorted_set. + for (int i = 0; i < current_size; ++i) + { + if (current_size - i <= this->bound_) + break; + + if (TAO_debug_level >= 8) + { + ACE_TCHAR const *b = + sorted_set[i]->item ()->started () == ACE_Time_Value::zero ? + ACE_TEXT ("broken ") : ACE_TEXT (""); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - NBFCS::cleanup, ") + ACE_TEXT ("cleaning %s%d bytes (hash %d)\n"), + b, sorted_set[i]->item ()->data_length (), + sorted_set[i]->key ())); + } + + ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> guard (sorted_set[i]->item ()); + packets.unbind (sorted_set[i]); + } + } + + void + Memory_Bound_Fragments_Cleanup_Strategy::cleanup ( + TAO_UIPMC_Mcast_Transport::Packets_Map &packets + ) + { + // First we need to calculate the size of packets. Since we anyway run + // this loop we can also cleanup broken packets. + CORBA::ULong size = 0; + for (HASH_MAP_ITER iter = packets.begin (); + iter != packets.end ();) + { + // Move forward iter because what it was pointing to could be + // unbound at the end of the loop leaving the iterator pointing + // to removed entry. + HASH_MAP_ITER cur_iter = iter++; + + if ((*cur_iter).item ()->started () == ACE_Time_Value::zero) + { + if (TAO_debug_level >= 8) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - MBFCS::cleanup, ") + ACE_TEXT ("cleaning broken %d bytes (hash %d)\n"), + (*cur_iter).item ()->data_length (), + (*cur_iter).key ())); + + ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> guard ((*cur_iter).item ()); + packets.unbind (cur_iter); + } + else + size += (*cur_iter).item ()->data_length (); + } + + // bound_ is a number of bytes. + if (static_cast<int> (size) <= this->bound_) + return; + + int const current_size = static_cast<int> (packets.current_size ()); + + DESCRIPTOR_SET sorted_set; + ACE_NEW (sorted_set, HASH_MAP_ENTRY*[current_size]); + ACE_Auto_Array_Ptr<HASH_MAP_ENTRY*> owner (sorted_set); + + HASH_MAP_ITER iter = packets.begin (); + + for (int i = 0; i < current_size; ++i) + { + sorted_set[i] = &(*iter); + ++iter; + } + + sort_set (sorted_set, current_size); + + // We have to clean all those packets above bound. + for (int i = 0; i < current_size; ++i) + { + if (static_cast<int> (size) <= this->bound_) + break; + + if (TAO_debug_level >= 8) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - MBFCS::cleanup, ") + ACE_TEXT ("cleaning %d bytes (hash %d)\n"), + sorted_set[i]->item ()->data_length (), + sorted_set[i]->key ())); + + size -= sorted_set[i]->item ()->data_length (); + ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> guard (sorted_set[i]->item ()); + packets.unbind (sorted_set[i]); + } + } + +} // namespace TAO_PG + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.h b/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.h new file mode 100644 index 00000000000..4915c0d86ed --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.h @@ -0,0 +1,114 @@ +// -*- C++ -*- + +// ================================================================ +/** + * @file Fragments_Cleanup_Strategy.h + * + * $Id$ + * + * @author Vladimir Zykov <vz@prismtech.com> + */ +// ================================================================ + +#ifndef TAO_FRAGMENTS_CLEANUP_STRATEGY_H +#define TAO_FRAGMENTS_CLEANUP_STRATEGY_H + +#include /**/ "ace/pre.h" + +#include "orbsvcs/PortableGroup/portablegroup_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h" + +#include /**/ "tao/Versioned_Namespace.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +// **************************************************************** + +namespace TAO_PG +{ + + /** + * @class Fragments_Cleanup_Strategy + * + * @brief The base class for all cleanup strategies + * + * This class is used by TAO_UIPMC_Mcast_Transport for cleaning + * received fragments. + */ + class TAO_PortableGroup_Export Fragments_Cleanup_Strategy + { + public: + /// The constructor + Fragments_Cleanup_Strategy (int bound); + + /// The destructor + virtual ~Fragments_Cleanup_Strategy (void); + + /// Sub-classes must implement these methods + virtual void cleanup (TAO_UIPMC_Mcast_Transport::Packets_Map &) = 0; + + protected: + /// The bound that triggers a cleanup. + int bound_; + }; + + /** + * @class Time_Bound_Fragments_Cleanup_Strategy + * + * @brief Cleanup if a message cannot reassemble for a long time. + */ + class TAO_PortableGroup_Export Time_Bound_Fragments_Cleanup_Strategy + : public Fragments_Cleanup_Strategy + { + public: + /// The constructor + Time_Bound_Fragments_Cleanup_Strategy (int bound); + + virtual void cleanup (TAO_UIPMC_Mcast_Transport::Packets_Map &packets); + }; + + /** + * @class Number_Bound_Fragments_Cleanup_Strategy + * + * @brief Cleanup if there are too many messages cannot reassemble. + */ + class TAO_PortableGroup_Export Number_Bound_Fragments_Cleanup_Strategy + : public Fragments_Cleanup_Strategy + { + public: + /// The constructor + Number_Bound_Fragments_Cleanup_Strategy (int bound); + + virtual void cleanup (TAO_UIPMC_Mcast_Transport::Packets_Map &packets); + }; + + /** + * @class Memory_Bound_Fragments_Cleanup_Strategy + * + * @brief Cleanup if non-reassembled messages take too much memory. + */ + class TAO_PortableGroup_Export Memory_Bound_Fragments_Cleanup_Strategy + : public Fragments_Cleanup_Strategy + { + public: + /// The constructor + Memory_Bound_Fragments_Cleanup_Strategy (int bound); + + virtual void cleanup (TAO_UIPMC_Mcast_Transport::Packets_Map &packets); + }; + +} // namespace TAO_PG + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif /* TAO_FRAGMENTS_CLEANUP_STRATEGY_H */ diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.inl b/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.inl new file mode 100644 index 00000000000..0cb2424d222 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.inl @@ -0,0 +1,37 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO_PG +{ + ACE_INLINE + Fragments_Cleanup_Strategy::Fragments_Cleanup_Strategy (int bound) + : bound_ (bound) + { + } + + ACE_INLINE + Time_Bound_Fragments_Cleanup_Strategy:: + Time_Bound_Fragments_Cleanup_Strategy (int bound) + : Fragments_Cleanup_Strategy (bound) + { + } + + ACE_INLINE + Number_Bound_Fragments_Cleanup_Strategy:: + Number_Bound_Fragments_Cleanup_Strategy (int bound) + : Fragments_Cleanup_Strategy (bound) + { + } + + ACE_INLINE + Memory_Bound_Fragments_Cleanup_Strategy:: + Memory_Bound_Fragments_Cleanup_Strategy (int bound) + : Fragments_Cleanup_Strategy (bound) + { + } +} // namespace TAO_PG + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/MIOP.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/MIOP.cpp index aad6e727273..103c0ccc3cf 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/MIOP.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/MIOP.cpp @@ -5,6 +5,7 @@ #include "ace/Service_Config.h" #include "orbsvcs/PortableGroup/PortableGroup_Loader.h" #include "orbsvcs/PortableGroup/PG_Object_Adapter_Factory.h" +#include "orbsvcs/PortableGroup/miop_resource.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL @@ -19,6 +20,9 @@ namespace TAO ACE_Service_Config::process_directive ( ace_svc_desc_TAO_PG_Object_Adapter_Factory); + ACE_Service_Config::process_directive ( + ace_svc_desc_TAO_MIOP_Resource_Factory); + return ACE_Service_Config::process_directive ( ace_svc_desc_TAO_UIPMC_Protocol_Factory); } diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/PortableGroup_Loader.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/PortableGroup_Loader.cpp index 493ac22733e..51915a2a5b9 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/PortableGroup_Loader.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/PortableGroup_Loader.cpp @@ -3,6 +3,7 @@ #include "orbsvcs/PortableGroup/PortableGroup_Loader.h" #include "orbsvcs/PortableGroup/PG_Object_Adapter_Factory.h" #include "orbsvcs/PortableGroup/PortableGroup_ORBInitializer.h" +#include "orbsvcs/PortableGroup/miop_resource.h" #include "ace/Dynamic_Service.h" #include "tao/ORB_Core.h" #include "tao/ORBInitializer_Registry.h" @@ -51,6 +52,7 @@ int TAO_PortableGroup_Loader::Initializer (void) { ACE_Service_Config::process_directive (ace_svc_desc_TAO_PortableGroup_Loader); + ACE_Service_Config::process_directive (ace_svc_desc_TAO_MIOP_Resource_Factory); return -1; } diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/README b/TAO/orbsvcs/orbsvcs/PortableGroup/README index 9e1bb9f038c..11bb5115829 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/README +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/README @@ -20,24 +20,18 @@ Implementation Notes: The current MIOP implementation is usable, but not complete according to the final MIOP specification. The following are current limitations: - 1. MIOP packet reassembly. This limits requests to about 5-6K in length - depending on the platform. Miop packet segmentation is available, - but not completely tested. - - 2. Disassociating a group reference from a servant is not implemented. + 1. Disassociating a group reference from a servant is not implemented. If interested in fixing this, be aware of the race conditions if a servant disassociates during an upcall. The TAO event services all have code to handle this issue. - 3. Clients must call _unchecked_narrow to narrow any group references + 2. Clients must call _unchecked_narrow to narrow any group references since a multicast group manager isn't supported yet (and UIPMC does not support twoway invocations) - 4. TAO does not have a multicast group manager, so groups must be + 3. TAO does not have a multicast group manager, so groups must be assigned by creating a MIOP CORBALOC formatted reference. See the tests or the MIOP specification for example group references. - 5. TAO does not have a multicast gateway implementation, so using MIOP + 4. TAO does not have a multicast gateway implementation, so using MIOP is currently limited to networks that natively support multicast - - diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.cpp index fc27fb18ddd..7ec47255288 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.cpp @@ -1,16 +1,12 @@ -// This may look like C, but it's really -*- C++ -*- // $Id$ #include "orbsvcs/PortableGroup/UIPMC_Profile.h" #include "orbsvcs/PortableGroup/UIPMC_Acceptor.h" +#include "orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h" -#include "tao/MProfile.h" #include "tao/ORB_Core.h" #include "tao/debug.h" -#include "tao/Protocols_Hooks.h" -#include "tao/ORB_Constants.h" -#include "ace/Auto_Ptr.h" #include "ace/os_include/os_netdb.h" #if !defined(__ACE_INLINE__) @@ -26,7 +22,6 @@ TAO_UIPMC_Acceptor::TAO_UIPMC_Acceptor (bool listen_on_all_ifs) endpoint_count_ (0), version_ (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR), orb_core_ (0), - connection_handler_ (0), listen_on_all_(listen_on_all_ifs) { } @@ -92,9 +87,8 @@ TAO_UIPMC_Acceptor::open (TAO_ORB_Core *orb_core, // The hostname cache has already been set! // This is bad mojo, i.e. an internal TAO error. ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) ") - ACE_TEXT ("UIPMC_Acceptor::open - ") - ACE_TEXT ("hostname already set\n\n")), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Acceptor::open, ") + ACE_TEXT ("hostname already set\n")), -1); } @@ -117,9 +111,12 @@ TAO_UIPMC_Acceptor::open (TAO_ORB_Core *orb_core, #if defined (ACE_HAS_IPV6) // Check if this is a (possibly) IPv6 supporting profile containing a // numeric IPv6 address representation. - if ((this->version_.major > TAO_MIN_IPV6_IIOP_MAJOR || - this->version_.minor >= TAO_MIN_IPV6_IIOP_MINOR) && - address[0] == '[') + if ( (this->version_.major > TAO_MIN_IPV6_IIOP_MAJOR + || + (this->version_.major == TAO_MIN_IPV6_IIOP_MAJOR + && + this->version_.minor >= TAO_MIN_IPV6_IIOP_MINOR) + ) && address[0] == '[') { // In this case we have to find the end of the numeric address and // start looking for the port separator from there. @@ -128,9 +125,8 @@ TAO_UIPMC_Acceptor::open (TAO_ORB_Core *orb_core, { // No valid IPv6 address specified. ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - ") - ACE_TEXT ("UIPMC_Acceptor::open, ") - ACE_TEXT ("Invalid IPv6 decimal address specified\n\n")), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Acceptor::open, ") + ACE_TEXT ("Invalid IPv6 decimal address specified\n")), -1); } else @@ -160,9 +156,8 @@ TAO_UIPMC_Acceptor::open (TAO_ORB_Core *orb_core, if (port_separator_loc == 0) { ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - ") - ACE_TEXT ("UIPMC_Acceptor::open, ") - ACE_TEXT ("port is not specified\n\n")), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Acceptor::open, ") + ACE_TEXT ("port is not specified\n")), -1); } @@ -178,10 +173,9 @@ TAO_UIPMC_Acceptor::open (TAO_ORB_Core *orb_core, addr.is_ipv4_mapped_ipv6 ())) { ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - ") - ACE_TEXT ("UIPMC_Acceptor::open, ") + ACE_TEXT ("TAO (%P|%t) - UIPMC_Acceptor::open, ") ACE_TEXT ("non-IPv6 endpoints not allowed when ") - ACE_TEXT ("connect_ipv6_only is set\n\n")), + ACE_TEXT ("connect_ipv6_only is set\n")), -1); } #endif /* ACE_HAS_IPV6 */ @@ -230,13 +224,14 @@ int TAO_UIPMC_Acceptor::open_i (const ACE_INET_Addr& addr, ACE_Reactor *reactor) { - ACE_NEW_RETURN (this->connection_handler_, + TAO_UIPMC_Mcast_Connection_Handler *connection_handler = 0; + ACE_NEW_RETURN (connection_handler, TAO_UIPMC_Mcast_Connection_Handler (this->orb_core_), -1); - this->connection_handler_->local_addr (addr); - this->connection_handler_->listen_on_all (this->listen_on_all_); - if (this->connection_handler_->open (0)) + connection_handler->local_addr (addr); + connection_handler->listen_on_all (this->listen_on_all_); + if (connection_handler->open (0)) { ACE_DEBUG ((LM_ERROR, ACE_TEXT("TAO (%P|%t) - TAO_UIPMC_Acceptor::open_i, ") @@ -246,17 +241,17 @@ TAO_UIPMC_Acceptor::open_i (const ACE_INET_Addr& addr, } int result = - reactor->register_handler (this->connection_handler_, + reactor->register_handler (connection_handler, ACE_Event_Handler::READ_MASK); if (result == -1) { - // Close the handler (this will also delete connection_handler_). - this->connection_handler_->close (); + // Close the handler (this will also delete connection_handler). + connection_handler->close (); return result; } // Connection handler ownership now belongs to the Reactor. - this->connection_handler_->remove_reference (); + connection_handler->remove_reference (); // Set the port for each addr. If there is more than one network // interface then the endpoint created on each interface will be on @@ -271,8 +266,8 @@ TAO_UIPMC_Acceptor::open_i (const ACE_INET_Addr& addr, for (size_t i = 0; i < this->endpoint_count_; ++i) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - UIPMC_Acceptor::open_i ") - ACE_TEXT ("listening on: <%s:%u>\n"), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Acceptor::open_i, ") + ACE_TEXT ("listening on: <%C:%u>\n"), this->hosts_[i], this->addrs_[i].get_port_number ())); } @@ -295,15 +290,14 @@ int TAO_UIPMC_Acceptor::dotted_decimal_address (ACE_INET_Addr &addr, char *&host) { - const char *tmp = addr.get_host_addr (); - if (tmp == 0) + char tmp[INET6_ADDRSTRLEN]; + if (!addr.get_host_addr (tmp, sizeof tmp)) { - if (TAO_debug_level > 0) + if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\n\nTAO (%P|%t) ") - ACE_TEXT ("UIPMC_Acceptor::dotted_decimal_address ") - ACE_TEXT ("- %p\n\n"), - ACE_TEXT ("cannot determine hostname"))); + ACE_TEXT ("TAO (%P|%t) - UIPMC_Acceptor::") + ACE_TEXT ("dotted_decimal_address, cannot determine ") + ACE_TEXT ("hostname '%m'\n"))); return -1; } diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.h index 13acbf67da8..92cfacea3fc 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Acceptor.h @@ -17,8 +17,6 @@ #include /**/ "ace/pre.h" -#include "orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h" - #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ @@ -116,8 +114,6 @@ protected: /// Parse protocol specific options. virtual int parse_options (const char *options); -protected: - /// Array of ACE_INET_Addr instances, each one corresponding to a /// given network interface. ACE_INET_Addr *addrs_; @@ -146,9 +142,6 @@ protected: TAO_ORB_Core *orb_core_; private: - - TAO_UIPMC_Mcast_Connection_Handler *connection_handler_; - bool listen_on_all_; }; diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp index 3127c8fae25..b2393e3b349 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp @@ -1,9 +1,9 @@ -// This may look like C, but it's really -*- C++ -*- // $Id$ - #include "orbsvcs/PortableGroup/UIPMC_Connection_Handler.h" #include "orbsvcs/PortableGroup/UIPMC_Endpoint.h" +#include "orbsvcs/PortableGroup/UIPMC_Transport.h" +#include "orbsvcs/PortableGroup/miop_resource.h" #include "tao/Timeprobe.h" #include "tao/debug.h" @@ -22,7 +22,8 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_UIPMC_Connection_Handler::TAO_UIPMC_Connection_Handler (ACE_Thread_Manager *t) : TAO_UIPMC_SVC_HANDLER (t, 0 , 0), TAO_Connection_Handler (0), - dscp_codepoint_ (IPDSFIELD_DSCP_DEFAULT << 2) + dscp_codepoint_ (IPDSFIELD_DSCP_DEFAULT << 2), + send_hi_water_mark_ (0u) { // This constructor should *never* get called, it is just here to // make the compiler happy: the default implementation of the @@ -35,11 +36,12 @@ TAO_UIPMC_Connection_Handler::TAO_UIPMC_Connection_Handler (ACE_Thread_Manager * TAO_UIPMC_Connection_Handler::TAO_UIPMC_Connection_Handler (TAO_ORB_Core *orb_core) : TAO_UIPMC_SVC_HANDLER (orb_core->thr_mgr (), 0, 0), TAO_Connection_Handler (orb_core), - dscp_codepoint_ (IPDSFIELD_DSCP_DEFAULT << 2) + dscp_codepoint_ (IPDSFIELD_DSCP_DEFAULT << 2), + send_hi_water_mark_ (0u) { - UIPMC_TRANSPORT* specific_transport = 0; + TAO_UIPMC_Transport* specific_transport = 0; ACE_NEW(specific_transport, - UIPMC_TRANSPORT (this, orb_core)); + TAO_UIPMC_Transport (this, orb_core)); // store this pointer (indirectly increment ref count) this->transport (specific_transport); @@ -54,14 +56,14 @@ TAO_UIPMC_Connection_Handler::~TAO_UIPMC_Connection_Handler (void) if (result == -1 && TAO_debug_level) { ACE_ERROR ((LM_ERROR, - ACE_TEXT("TAO (%P|%t) - UIPMC_Connection_Handler::") - ACE_TEXT("~UIPMC_Connection_Handler, ") - ACE_TEXT("release_os_resources() failed %m\n"))); + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connection_Handler::") + ACE_TEXT ("~UIPMC_Connection_Handler, ") + ACE_TEXT ("release_os_resources() failed %m\n"))); } } const ACE_INET_Addr & -TAO_UIPMC_Connection_Handler::addr (void) +TAO_UIPMC_Connection_Handler::addr (void) const { return this->addr_; } @@ -73,24 +75,21 @@ TAO_UIPMC_Connection_Handler::addr (const ACE_INET_Addr &addr) } const ACE_INET_Addr & -TAO_UIPMC_Connection_Handler::local_addr (void) +TAO_UIPMC_Connection_Handler::local_addr (void) const { - return local_addr_; + return this->local_addr_; } void TAO_UIPMC_Connection_Handler::local_addr (const ACE_INET_Addr &addr) { - local_addr_ = addr; + this->local_addr_ = addr; } -ssize_t -TAO_UIPMC_Connection_Handler::send (const iovec iov[], - int n, - const ACE_Addr &addr, - int flags) const +u_long +TAO_UIPMC_Connection_Handler::send_hi_water_mark (void) const { - return this->peer ().send (iov, n, addr, flags); + return this->send_hi_water_mark_; } int @@ -102,12 +101,20 @@ TAO_UIPMC_Connection_Handler::open_handler (void *v) int TAO_UIPMC_Connection_Handler::open (void*) { + TAO_MIOP_Resource_Factory *const factory = + ACE_Dynamic_Service<TAO_MIOP_Resource_Factory>::instance ( + this->orb_core ()->configuration(), + ACE_TEXT ("MIOP_Resource_Factory")); + // Since only client can send data over MIOP // then ttl is only applicable to client socket. - TAO_DIOP_Protocol_Properties protocol_properties; // Initialize values from ORB params. + protocol_properties.send_buffer_size_ = + factory->send_buffer_size () ? + factory->send_buffer_size () : + this->orb_core ()->orb_params ()->sock_sndbuf_size (); protocol_properties.hop_limit_ = this->orb_core ()->orb_params ()->ip_hoplimit (); protocol_properties.enable_multicast_loop_ = @@ -130,14 +137,12 @@ TAO_UIPMC_Connection_Handler::open (void*) this->peer ().open (this->local_addr_); - if (TAO_debug_level > 5) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - UIPMC_Connection_Handler::open, ") - ACE_TEXT("listening on: <%s:%u>\n"), - this->local_addr_.get_host_addr (), - this->local_addr_.get_port_number ())); - } + if (this->set_socket_option (this->peer (), + protocol_properties.send_buffer_size_, + 0) == -1) + { + return -1; + } if (protocol_properties.hop_limit_ >= 0) { @@ -184,7 +189,7 @@ TAO_UIPMC_Connection_Handler::open (void*) { ACE_ERROR ((LM_ERROR, ACE_TEXT("TAO (%P|%t) - UIPMC_Connection_Handler::open, ") - ACE_TEXT("couldn't set hop limit\n\n"))); + ACE_TEXT("couldn't set hop limit '%m'\n"))); } return -1; } @@ -235,16 +240,62 @@ TAO_UIPMC_Connection_Handler::open (void*) if (TAO_debug_level) { ACE_ERROR ((LM_ERROR, - ACE_TEXT("TAO (%P|%t) - UIPMC_Connection_Handler::open, ") - ACE_TEXT("couldn't %s multicast packets looping %p\n"), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connection_Handler::open, ") + ACE_TEXT ("couldn't %s multicast packets looping '%m'\n"), protocol_properties.enable_multicast_loop_ ? - ACE_TEXT("enable") : ACE_TEXT("disable"), - ACE_TEXT("errno") + ACE_TEXT ("enable") : ACE_TEXT ("disable") )); } return -1; } + this->send_hi_water_mark_ = factory->send_hi_water_mark (); + if (!this->send_hi_water_mark_) + { +#if defined (ACE_LACKS_SO_SNDBUF) + // Assume a small buffer + this->send_hi_water_mark_ = 1024u; + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connection_Handler::") + ACE_TEXT ("open, -ORBSendHighWaterMark not specified ") + ACE_TEXT ("using %u bytes\n"), + this->send_hi_water_mark_)); +#else + int size = sizeof (this->send_hi_water_mark_); + result = + this->peer ().get_option (SOL_SOCKET, + SO_SNDBUF, + static_cast<void *> (&this->send_hi_water_mark_), + &size); + if (!result) + { + // Note unix kernals double the value that is set (to hold the + // internal data structures seporating each packet) and this doubled + // value is what is returned by the get_option, so it is best to halve. + this->send_hi_water_mark_ >>= 1; + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connection_Handler::") + ACE_TEXT ("open, -ORBSendHighWaterMark not specified, ") + ACE_TEXT ("using -ORBSndSock value of %u bytes\n"), + this->send_hi_water_mark_)); + } + else + { + // Assume a small buffer + this->send_hi_water_mark_ = 1024u; + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connection_Handler::") + ACE_TEXT ("open, -ORBSendHighWaterMark not specified ") + ACE_TEXT ("and getsockopt failed '%m', using %u bytes\n"), + this->send_hi_water_mark_)); + return -1; + } +#endif // defined (ACE_LACKS_SO_SNDBUF) + } + // Set that the transport is now connected, if fails we return -1 // Use C-style cast b/c otherwise we get warnings on lots of // compilers @@ -379,8 +430,9 @@ TAO_UIPMC_Connection_Handler::set_tos (int tos) if (TAO_debug_level) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - UIPMC_Connection_Handler::" - "set_dscp_codepoint -> IPV6_TCLASS not supported yet\n")); + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connection_Handler::") + ACE_TEXT ("set_dscp_codepoint -> IPV6_TCLASS not ") + ACE_TEXT ("supported yet\n"))); } return 0; } @@ -400,11 +452,13 @@ TAO_UIPMC_Connection_Handler::set_tos (int tos) if (TAO_debug_level) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - UIPMC_Connection_Handler::" - "set_dscp_codepoint -> dscp: %x; result: %d; %s\n", + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connection_Handler::") + ACE_TEXT ("set_dscp_codepoint -> dscp: %x; ") + ACE_TEXT ("result: %d; %s\n"), tos, result, - result == -1 ? "try running as superuser" : "")); + result == -1 ? + ACE_TEXT ("try running as superuser") : ACE_TEXT (""))); } // On successful setting of TOS field. diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.h index 75e2e1bd831..39cd1dfd704 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.h @@ -21,8 +21,6 @@ #pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "orbsvcs/PortableGroup/UIPMC_Transport.h" - #include "tao/Wait_Strategy.h" #include "tao/Connection_Handler.h" @@ -30,7 +28,6 @@ #include "ace/Reactor.h" #include "ace/SOCK_Dgram.h" - TAO_BEGIN_VERSIONED_NAMESPACE_DECL // This connection handler. @@ -60,7 +57,7 @@ public: TAO_UIPMC_Connection_Handler (ACE_Thread_Manager* t = 0); - /// Constructor. arg parameter is used by the Acceptor to pass the + /// Constructor. arg parameter is used by the Acceptor to pass the /// protocol configuration properties for this connection. TAO_UIPMC_Connection_Handler (TAO_ORB_Core *orb_core); @@ -99,20 +96,15 @@ public: int set_dscp_codepoint (CORBA::Long dscp_codepoint); // UIPMC Additions - Begin - const ACE_INET_Addr &addr (void); + const ACE_INET_Addr &addr (void) const; void addr (const ACE_INET_Addr &addr); - const ACE_INET_Addr &local_addr (void); + const ACE_INET_Addr &local_addr (void) const; void local_addr (const ACE_INET_Addr &addr); - /// This is only to be able to use client and server - /// connection handlers in the same way in transport. - ssize_t send (const iovec iov[], - int n, - const ACE_Addr &addr, - int flags = 0) const; + u_long send_hi_water_mark (void) const; // UIPMC Additions - End protected: @@ -143,15 +135,11 @@ private: /// Stores the type of service value. int dscp_codepoint_; -}; - -// Transport for this handler. -typedef TAO_UIPMC_Transport<TAO_UIPMC_Connection_Handler> - UIPMC_TRANSPORT; -#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT -template class TAO_PortableGroup_Export TAO_UIPMC_Transport<TAO_UIPMC_Connection_Handler>; -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + /// How much data can be sent without delays. It defaults to the size + /// of the socket buffer. + u_long send_hi_water_mark_; +}; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.cpp index 68b1717f790..6f487bf45a6 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.cpp @@ -1,17 +1,19 @@ // $Id$ -#include "orbsvcs/PortableGroup/UIPMC_Profile.h" #include "orbsvcs/PortableGroup/UIPMC_Connector.h" +#include "orbsvcs/PortableGroup/UIPMC_Profile.h" +#include "orbsvcs/PortableGroup/UIPMC_Connection_Handler.h" #include "tao/debug.h" #include "tao/ORB_Core.h" -#include "tao/Base_Transport_Property.h" -#include "tao/Protocols_Hooks.h" +#include "tao/Transport_Descriptor_Interface.h" +#include "tao/Transport.h" #include "tao/Thread_Lane_Resources.h" #include "ace/Connector.h" #include "ace/OS_NS_strings.h" #include "ace/os_include/os_netdb.h" +#include "ace/OS_NS_sys_socket.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL @@ -111,7 +113,8 @@ TAO_UIPMC_Connector::make_connection (TAO::Profile_Transport_Resolver *, ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - UIPMC_Connector::open, ") - ACE_TEXT ("invalid connection to IPv4 mapped IPv6 interface <%s>!\n"), + ACE_TEXT ("invalid connection to IPv4 mapped IPv6 ") + ACE_TEXT ("interface <%s>!\n"), remote_as_string)); } return 0; @@ -139,8 +142,7 @@ TAO_UIPMC_Connector::make_connection (TAO::Profile_Transport_Resolver *, ACE_INET_Addr local_addr(any_addr); svc_handler->addr (remote_address); - int retval = 0; - + int retval= 0; while (uipmc_endpoint != 0) { if (uipmc_endpoint->is_preferred_network ()) @@ -205,14 +207,29 @@ TAO_UIPMC_Connector::make_connection (TAO::Profile_Transport_Resolver *, return 0; } + // After the handler is opened we can try to obtain the real local address. + svc_handler->peer ().get_local_addr (local_addr); + svc_handler->local_addr (local_addr); + if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - UIPMC_Connector::make_connection, ") - ACE_TEXT ("new connection on HANDLE %d\n"), - svc_handler->get_handle ())); + { + char local_hostaddr[INET6_ADDRSTRLEN]; + local_addr.get_host_addr (local_hostaddr, sizeof local_hostaddr); + char remote_hostaddr[INET6_ADDRSTRLEN]; + remote_address.get_host_addr (remote_hostaddr, sizeof remote_hostaddr); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connector::make_connection, ") + ACE_TEXT ("new connection from <%C:%u> to <%C:%u> on ") + ACE_TEXT ("HANDLE %d\n"), + local_hostaddr, + local_addr.get_port_number (), + remote_hostaddr, + remote_address.get_port_number (), + svc_handler->get_handle ())); + } - UIPMC_TRANSPORT *transport = - dynamic_cast<UIPMC_TRANSPORT *> (svc_handler->transport ()); + TAO_Transport *transport = svc_handler->transport (); // In case of errors transport is zero if (transport == 0) @@ -220,12 +237,13 @@ TAO_UIPMC_Connector::make_connection (TAO::Profile_Transport_Resolver *, svc_handler->close (); // Give users a clue to the problem. - if (TAO_debug_level > 3) + if (TAO_debug_level) ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - UIPMC_Connector::make_connection, " - "connection to <%C:%u> failed (%p)\n", - remote_address.get_host_addr (), - remote_address.get_port_number (), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connector:") + ACE_TEXT (":make_connection, connection to ") + ACE_TEXT ("<%C:%u> failed (%p)\n"), + uipmc_endpoint->host (), + uipmc_endpoint->port (), ACE_TEXT ("errno"))); return 0; @@ -241,11 +259,12 @@ TAO_UIPMC_Connector::make_connection (TAO::Profile_Transport_Resolver *, { svc_handler->close (); - if (TAO_debug_level > 0) + if (TAO_debug_level) { ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - UIPMC_Connector::make_connection, " - "could not add the new connection to cache\n")); + ACE_TEXT ("TAO (%P|%t) - UIPMC_Connector::") + ACE_TEXT ("make_connection, could not add the ") + ACE_TEXT ("new connection to cache\n"))); } return 0; @@ -331,4 +350,3 @@ TAO_UIPMC_Connector::cancel_svc_handler ( } TAO_END_VERSIONED_NAMESPACE_DECL - diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.h index 52691673b81..6751c306f85 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connector.h @@ -23,10 +23,7 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "orbsvcs/PortableGroup/UIPMC_Connection_Handler.h" - #include "tao/Transport_Connector.h" -#include "ace/Null_Mutex.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Endpoint.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Endpoint.cpp index da6e5ea716c..ad7a5305430 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Endpoint.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Endpoint.cpp @@ -63,8 +63,9 @@ void TAO_UIPMC_Endpoint::object_addr (const ACE_INET_Addr &addr) { this->port_ = addr.get_port_number(); - this->host_ = CORBA::string_dup (addr.get_host_addr ()); - + char tmp[INET6_ADDRSTRLEN]; + addr.get_host_addr (tmp, sizeof tmp); + this->host_ = CORBA::string_dup (tmp); this->object_addr_.set (addr); } @@ -77,10 +78,13 @@ TAO_UIPMC_Endpoint::host (void) const int TAO_UIPMC_Endpoint::addr_to_string (char *buffer, size_t length) { + char tmp[INET6_ADDRSTRLEN]; + this->object_addr_.get_host_addr (tmp, sizeof tmp); + size_t actual_len = - ACE_OS::strlen (this->object_addr_.get_host_addr ()) // chars in host name - + sizeof (':') // delimiter - + 5 // max port + ACE_OS::strlen (tmp) // chars in host name + + sizeof (':') // delimiter + + 5u // max port + sizeof ('\0'); #if defined (ACE_HAS_IPV6) @@ -93,13 +97,13 @@ TAO_UIPMC_Endpoint::addr_to_string (char *buffer, size_t length) #if defined (ACE_HAS_IPV6) if (this->object_addr_.get_type () == AF_INET6) - ACE_OS::sprintf (buffer, "[%s]:%d", - this->object_addr_.get_host_addr (), + ACE_OS::sprintf (buffer, "[%s]:%u", + tmp, this->port_); else #endif /* ACE_HAS_IPV6 */ - ACE_OS::sprintf (buffer, "%s:%d", - this->object_addr_.get_host_addr (), + ACE_OS::sprintf (buffer, "%s:%u", + tmp, this->port_); return 0; diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Factory.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Factory.cpp index de8fc0c8430..911c269fc35 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Factory.cpp @@ -3,8 +3,9 @@ #include "orbsvcs/PortableGroup/UIPMC_Factory.h" #include "orbsvcs/PortableGroup/UIPMC_Acceptor.h" #include "orbsvcs/PortableGroup/UIPMC_Connector.h" -#include "ace/OS_NS_strings.h" #include "tao/ORB_Constants.h" +#include "ace/OS_NS_strings.h" +#include "ace/UUID.h" static const char the_prefix[] = "uipmc"; @@ -45,9 +46,8 @@ TAO_Acceptor * TAO_UIPMC_Protocol_Factory::make_acceptor (void) { TAO_Acceptor *acceptor = 0; - ACE_NEW_RETURN (acceptor, - TAO_UIPMC_Acceptor(this->listen_on_all_), + TAO_UIPMC_Acceptor (this->listen_on_all_), 0); return acceptor; @@ -85,8 +85,11 @@ TAO_UIPMC_Protocol_Factory::init (int argc, TAO_Connector * TAO_UIPMC_Protocol_Factory::make_connector (void) { - TAO_Connector *connector = 0; + // This is done only once when the library is loaded and + // only on the client side. + ACE_Utils::UUID_GENERATOR::instance ()->init (); + TAO_Connector *connector = 0; ACE_NEW_RETURN (connector, TAO_UIPMC_Connector, 0); @@ -112,7 +115,7 @@ ACE_STATIC_SVC_DEFINE (TAO_UIPMC_Protocol_Factory, ACE_SVC_OBJ_T, &ACE_SVC_NAME (TAO_UIPMC_Protocol_Factory), ACE_Service_Type::DELETE_THIS | - ACE_Service_Type::DELETE_OBJ, + ACE_Service_Type::DELETE_OBJ, 0) ACE_FACTORY_DEFINE (TAO_PortableGroup, TAO_UIPMC_Protocol_Factory) diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.cpp index b6f1861a59a..76d1cba6007 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.cpp @@ -1,8 +1,9 @@ -// This may look like C, but it's really -*- C++ -*- // $Id$ #include "orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h" #include "orbsvcs/PortableGroup/UIPMC_Endpoint.h" +#include "orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h" +#include "orbsvcs/PortableGroup/miop_resource.h" #include "tao/Timeprobe.h" #include "tao/debug.h" @@ -38,9 +39,9 @@ TAO_UIPMC_Mcast_Connection_Handler::TAO_UIPMC_Mcast_Connection_Handler ( TAO_Connection_Handler (orb_core), listen_on_all_(false) { - UIPMC_MULTICAST_TRANSPORT* specific_transport = 0; + TAO_UIPMC_Mcast_Transport *specific_transport = 0; ACE_NEW(specific_transport, - UIPMC_MULTICAST_TRANSPORT (this, orb_core)); + TAO_UIPMC_Mcast_Transport (this, orb_core)); // store this pointer (indirectly increment ref count) this->transport (specific_transport); @@ -55,14 +56,14 @@ TAO_UIPMC_Mcast_Connection_Handler::~TAO_UIPMC_Mcast_Connection_Handler (void) if (result == -1 && TAO_debug_level) { ACE_ERROR ((LM_ERROR, - ACE_TEXT("TAO (%P|%t) - UIPMC_Mcast_Connection_Handler::") - ACE_TEXT("~UIPMC_Mcast_Connection_Handler, ") - ACE_TEXT("release_os_resources() failed %m\n"))); + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Connection_Handler::") + ACE_TEXT ("~UIPMC_Mcast_Connection_Handler, ") + ACE_TEXT ("release_os_resources() failed '%m'\n"))); } } const ACE_INET_Addr & -TAO_UIPMC_Mcast_Connection_Handler::addr (void) +TAO_UIPMC_Mcast_Connection_Handler::addr (void) const { return this->addr_; } @@ -74,25 +75,15 @@ TAO_UIPMC_Mcast_Connection_Handler::addr (const ACE_INET_Addr &addr) } const ACE_INET_Addr & -TAO_UIPMC_Mcast_Connection_Handler::local_addr (void) +TAO_UIPMC_Mcast_Connection_Handler::local_addr (void) const { - return local_addr_; + return this->local_addr_; } void TAO_UIPMC_Mcast_Connection_Handler::local_addr (const ACE_INET_Addr &addr) { - local_addr_ = addr; -} - -ssize_t -TAO_UIPMC_Mcast_Connection_Handler::send (const iovec [], - int, - const ACE_Addr &, - int) const -{ - ACE_ASSERT (0); - return -1; + this->local_addr_ = addr; } int @@ -109,32 +100,66 @@ TAO_UIPMC_Mcast_Connection_Handler::open (void*) this->peer ().opts(ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL | this->peer ().opts()); } + TAO_MIOP_Resource_Factory *const factory = + ACE_Dynamic_Service<TAO_MIOP_Resource_Factory>::instance ( + this->orb_core ()->configuration(), + ACE_TEXT ("MIOP_Resource_Factory")); + TAO_DIOP_Protocol_Properties protocol_properties; + protocol_properties.recv_buffer_size_ = + factory->receive_buffer_size () ? + factory->receive_buffer_size () : + this->orb_core ()->orb_params ()->sock_rcvbuf_size (); + if (this->peer ().join (this->local_addr_) == 0) { if (TAO_debug_level > 5) - { + { + char tmp[INET6_ADDRSTRLEN]; + this->local_addr_.get_host_addr (tmp, sizeof tmp); ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - UIPMC_Mcast_Connection_Handler::open, ") - ACE_TEXT("subscribed to multicast group at <%s:%d>\n"), - this->local_addr_.get_host_addr (), + ACE_TEXT("subscribed to multicast group at %C:%u\n"), + tmp, this->local_addr_.get_port_number () )); - } + } } +#ifndef ALLOW_UNICAST_MIOP else { - ACE_DEBUG ((LM_ERROR, - ACE_TEXT("TAO (%P|%t) - UIPMC_Mcast_Connection_Handler::open, ") - ACE_TEXT("failed to subscribe to multicast group at <%s:%d>%p\n"), - this->local_addr_.get_host_addr (), - this->local_addr_.get_port_number (), - ACE_TEXT(". Errno") - )); - return -1; + char tmp[INET6_ADDRSTRLEN]; + this->local_addr_.get_host_addr (tmp, sizeof tmp); + ACE_DEBUG ((LM_ERROR, + ACE_TEXT("TAO (%P|%t) - UIPMC_Mcast_Connection_Handler::open, ") + ACE_TEXT("failed to subscribe to multicast group at %C:%u '%m'\n"), + tmp, + this->local_addr_.get_port_number () + )); + return -1; } +#endif // ALLOW_UNICAST_MIOP - this->transport ()->id ((size_t) this->peer ().get_handle ()); + if (this->set_socket_option (this->peer (), + 0, + protocol_properties.recv_buffer_size_) == -1) + { + return -1; + } + + // The socket has to be set in non-blocking mode in order to work with + // TAO_UIPMC_Mcast_Transport::handle_input(). + if (this->peer ().enable (ACE_NONBLOCK) == -1) + { + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Connection_Handler::") + ACE_TEXT ("open, failed to set to non-blocking mode ") + ACE_TEXT ("'%m'\n"))); + return -1; + } + + this->transport ()->id ((size_t) this->peer ().get_handle ()); return 0; } diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h index c3d41a70e9b..91653af3549 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h @@ -21,8 +21,6 @@ #pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "orbsvcs/PortableGroup/UIPMC_Transport.h" - #include "tao/Wait_Strategy.h" #include "tao/Connection_Handler.h" @@ -30,7 +28,6 @@ #include "ace/Reactor.h" #include "ace/SOCK_Dgram_Mcast.h" - TAO_BEGIN_VERSIONED_NAMESPACE_DECL // This connection handler. @@ -94,21 +91,14 @@ public: int add_transport_to_cache (void); // UIPMC Additions - Begin - const ACE_INET_Addr &addr (void); + const ACE_INET_Addr &addr (void) const; void addr (const ACE_INET_Addr &addr); - const ACE_INET_Addr &local_addr (void); + const ACE_INET_Addr &local_addr (void) const; void local_addr (const ACE_INET_Addr &addr); - /// This is only to be able to use client and server - /// connection handlers in the same way in transport. - ssize_t send (const iovec iov[], - int n, - const ACE_Addr &addr, - int flags = 0) const; - /// Set this to listen on all interfaces void listen_on_all(bool value); @@ -136,17 +126,8 @@ protected: virtual int release_os_resources (void); virtual int handle_write_ready (const ACE_Time_Value *timeout); //@} - }; -// Transport for this handler. -typedef TAO_UIPMC_Transport<TAO_UIPMC_Mcast_Connection_Handler> - UIPMC_MULTICAST_TRANSPORT; - -#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT -template class TAO_PortableGroup_Export TAO_UIPMC_Transport<TAO_UIPMC_Mcast_Connection_Handler>; -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ - TAO_END_VERSIONED_NAMESPACE_DECL #include /**/ "ace/post.h" diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp new file mode 100644 index 00000000000..8991a1ad1a1 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.cpp @@ -0,0 +1,498 @@ +// $Id$ + +#include "orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h" +#include "orbsvcs/PortableGroup/miopconf.h" +#include "orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.h" +#include "orbsvcs/PortableGroup/UIPMC_Mcast_Connection_Handler.h" +#include "orbsvcs/PortableGroup/UIPMC_Wait_Never.h" +#include "orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.h" +#include "orbsvcs/PortableGroup/miop_resource.h" + +#include "tao/ORB_Core.h" +#include "tao/debug.h" +#include "tao/GIOP_Message_Base.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO_UIPMC_Mcast_Transport::TAO_UIPMC_Mcast_Transport ( + TAO_UIPMC_Mcast_Connection_Handler *handler, + TAO_ORB_Core *orb_core +) + : TAO_Transport (IOP::TAG_UIPMC, + orb_core) + , connection_handler_ (handler) +{ + // Replace the default wait strategy with our own + // since we don't support waiting on anything. + delete this->ws_; + ACE_NEW (this->ws_, + TAO_UIPMC_Wait_Never (this)); +} + +TAO_UIPMC_Mcast_Transport::~TAO_UIPMC_Mcast_Transport (void) +{ + // Cleanup all packets. + this->cleanup_packets (false); +} + +void +TAO_UIPMC_Mcast_Transport::cleanup_packets (bool expired_only) +{ + if (expired_only) + { + TAO_MIOP_Resource_Factory *factory = + ACE_Dynamic_Service<TAO_MIOP_Resource_Factory>::instance ( + this->orb_core_->configuration(), + ACE_TEXT ("MIOP_Resource_Factory")); + + TAO_PG::Fragments_Cleanup_Strategy *cleanup_strategy = + factory->fragments_cleanup_strategy (); + + cleanup_strategy->cleanup (this->incomplete_); + } + else + { + for (Packets_Map::iterator iter = this->incomplete_.begin (); + iter != this->incomplete_.end ();) + { + // Move forward iter because what it was pointing to could be + // unbound at the end of the loop leaving the iterator pointing + // to removed entry. + Packets_Map::iterator cur_iter = iter++; + + if (TAO_debug_level >= 8) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("cleanup_packets, cleaning %d bytes\n"), + this->id (), + (*cur_iter).item ()->data_length ())); + } + + ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> guard ((*cur_iter).item ()); + this->incomplete_.unbind (cur_iter); + } + } +} + +ACE_Event_Handler * +TAO_UIPMC_Mcast_Transport::event_handler_i (void) +{ + return this->connection_handler_; +} + +TAO_Connection_Handler * +TAO_UIPMC_Mcast_Transport::connection_handler_i (void) +{ + return this->connection_handler_; +} + +ssize_t TAO_UIPMC_Mcast_Transport::send ( + iovec *, + int, + size_t &, + ACE_Time_Value const *) +{ + // Write the complete Message_Block chain to the connection. + // Shouldn't ever be called on the server side. + ACE_ASSERT (0); + return -1; +} + +ssize_t TAO_UIPMC_Mcast_Transport::recv ( + char *, + size_t, + ACE_Time_Value const *) +{ + // Shouldn't ever be called. We use recv_all() with different semantics. + ACE_ASSERT (0); + return -1; +} + +int TAO_UIPMC_Mcast_Transport::send_request ( + TAO_Stub *, + TAO_ORB_Core *, + TAO_OutputCDR &, + TAO_Message_Semantics, + ACE_Time_Value *) +{ + // Shouldn't ever be called on the server side. + ACE_ASSERT (0); + return -1; +} + +int TAO_UIPMC_Mcast_Transport::send_message ( + TAO_OutputCDR &, + TAO_Stub *, + TAO_ServerRequest *, + TAO_Message_Semantics, + ACE_Time_Value *) +{ + // Shouldn't ever be called on the server side. + ACE_ASSERT (0); + return -1; +} + +char * +TAO_UIPMC_Mcast_Transport::recv_packet ( + char *buf, + size_t len, + ACE_INET_Addr &from_addr, + CORBA::UShort &packet_length, + CORBA::ULong &packet_number, + bool &stop_packet, + u_long &id_hash) const +{ + // We read the whole MIOP packet which is not longer than MIOP_MAX_DGRAM_SIZE. + ssize_t const n = + this->connection_handler_->peer ().recv (buf, + len, + from_addr); + + // There is nothing left in the socket buffer. + if (n <= 0) + return 0; + + // Make sure that we at least have a MIOP header. + if (static_cast<size_t> (n) < MIOP_MIN_HEADER_SIZE) + { + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_packet, packet of size %d is ") + ACE_TEXT ("too small\n"), + this->id (), + n)); + } + + return 0; + } + + // Use CDR for reading fields from MIOP packet header. + // buf must be properly aligned. + TAO_InputCDR miop_hdr (buf, n); + + // Check for MIOP magic bytes. + CORBA::Octet miop_magic_recv[sizeof miop_magic]; + miop_hdr.read_octet_array (miop_magic_recv, sizeof miop_magic_recv); + if (miop_magic_recv[0] != miop_magic [0] || + miop_magic_recv[1] != miop_magic [1] || + miop_magic_recv[2] != miop_magic [2] || + miop_magic_recv[3] != miop_magic [3]) + { + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_packet, packet didn't contain ") + ACE_TEXT ("magic bytes\n"), + this->id ())); + } + + return 0; + } + + // Check MIOP version. + CORBA::Octet miop_version; + miop_hdr.read_octet (miop_version); + if (miop_version != + ((TAO_DEF_MIOP_MAJOR & 0xf) << 4) + (TAO_DEF_MIOP_MINOR & 0xf)) + { + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_packet, packet has wrong version ") + ACE_TEXT ("%d.%d\n"), + this->id (), + (miop_version >> 4) & 0xf, + miop_version & 0xf)); + } + + return 0; + } + + CORBA::Octet miop_flags; + miop_hdr.read_octet (miop_flags); + + // Retrieve the stop packet marker. + stop_packet = miop_flags & 0x02; + + // Set the byte order. + // 0 = Big endian + // 1 = Little endian + miop_hdr.reset_byte_order (miop_flags & 0x01); + + miop_hdr.read_ushort (packet_length); + + miop_hdr.read_ulong (packet_number); + + // We don't use number_of_packets since it's optional in the spec. + CORBA::ULong number_of_packets; + miop_hdr.read_ulong (number_of_packets); + + CORBA::ULong id_length; + miop_hdr.read_ulong (id_length); + + if (id_length > static_cast<CORBA::ULong> (MIOP_MAX_ID_LENGTH) || + static_cast<CORBA::ULong> (MIOP_ID_CONTENT_OFFSET) + + id_length + packet_length != static_cast<CORBA::ULong> (n)) + { + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_packet, malformed packet\n"), + this->id ())); + } + + return 0; + } + + ssize_t const miop_header_size = + (MIOP_ID_CONTENT_OFFSET + id_length + 7) & ~0x7; + if (miop_header_size > n) + { + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv_packet, packet not large enough ") + ACE_TEXT ("for padding\n"), + this->id ())); + } + + return 0; + } + + id_hash = ACE::hash_pjw (&buf[MIOP_ID_CONTENT_OFFSET], id_length); + +#if 0 + // Normally we don't need to log this but if someone will want. Here we are. + if (TAO_debug_level >= 10) + { + ACE_HEX_DUMP ((LM_DEBUG, + (char const *) buf, + miop_header_size, + ACE_TEXT ("MIOP header"))); + } +#endif + + // Return a pointer to data at 8 byte boundary. + return &buf[miop_header_size]; +} + +bool +TAO_UIPMC_Mcast_Transport::recv_all (void) +{ + // Only one thread will do recv. + ACE_Guard<TAO_SYNCH_MUTEX> recv_guard (this->recv_lock_, 0); // tryacquire + if (!recv_guard.locked ()) + return !this->complete_.is_empty (); + + // The buffer on the stack which will be used to hold the input + // messages. + char buf [MIOP_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT]; + char *aligned_buf = ACE_ptr_align_binary (buf, ACE_CDR::MAX_ALIGNMENT); + +#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) + (void) ACE_OS::memset (buf, + '\0', + sizeof buf); +#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ + + while (true) + { + // This guard will cleanup expired packets each iteration. + TAO_PG::UIPMC_Recv_Packet_Cleanup_Guard guard (this); + + ACE_INET_Addr from_addr; + CORBA::UShort packet_length; + CORBA::ULong packet_number; + bool stop_packet; + u_long id_hash; + + char *start_data = + this->recv_packet (aligned_buf, MIOP_MAX_DGRAM_SIZE, from_addr, + packet_length, packet_number, stop_packet, id_hash); + + // The socket buffer is empty. Try to do other useful things. + if (start_data == 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) + break; + + if (TAO_debug_level >= 10) + { + char tmp[INET6_ADDRSTRLEN]; + from_addr.get_host_addr (tmp, sizeof tmp); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("recv, received %d bytes from <%C:%u> ") + ACE_TEXT ("(hash %d)\n"), + this->id (), + packet_length, + tmp, + from_addr.get_port_number (), + id_hash)); + } + + TAO_PG::UIPMC_Recv_Packet *packet = 0; + if (this->incomplete_.find (id_hash, packet) == -1) + { + ACE_NEW_THROW_EX (packet, + TAO_PG::UIPMC_Recv_Packet, + CORBA::NO_MEMORY ( + CORBA::SystemException::_tao_minor_code ( + TAO::VMCID, + ENOMEM), + CORBA::COMPLETED_NO)); + + if (this->incomplete_.bind (id_hash, packet) != 0) + { + // Cleanup the packet. + ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> bail_guard (packet); + continue; + } + } + + // We have incomplete packet so add a new data to it. + int const ret = packet->add_fragment (start_data, packet_length, + packet_number, stop_packet); + + // add_fragment returns 1 iff the packet is complete. + if (ret == 1) + { + // Remove this packet from incomplete packets. + this->incomplete_.unbind (id_hash); + + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, + guard, + this->complete_lock_, + !this->complete_.is_empty ()); + + // Add it to the complete queue. + this->complete_.enqueue_tail (packet); + } + } + + return !this->complete_.is_empty (); +} + +int +TAO_UIPMC_Mcast_Transport::handle_input ( + TAO_Resume_Handle &rh, + ACE_Time_Value *) +{ + // Note: We should not ever return -1 from this function. This will close + // server connection which we don't want to happen here. + + if (TAO_debug_level >= 8) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("handle_input, started\n"), + this->id ())); + } + + while (this->recv_all ()) + { + TAO_PG::UIPMC_Recv_Packet *complete = 0; + + { + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->complete_lock_, 0); + if (this->complete_.dequeue_head (complete) == -1) + return 0; + } + + ACE_Auto_Ptr<TAO_PG::UIPMC_Recv_Packet> owner (complete); + + // Create a data block. + ACE_Data_Block db (complete->data_length (), + ACE_Message_Block::MB_DATA, + 0, + this->orb_core_->input_cdr_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->input_cdr_dblock_allocator ()); + + // Create a message block + ACE_Message_Block message_block ( + &db, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->input_cdr_msgblock_allocator ()); + + // Align the message block. + ACE_CDR::mb_align (&message_block); + + complete->copy_data (message_block.wr_ptr ()); + + // Set the write pointer in the stack buffer. + message_block.wr_ptr (complete->data_length ()); + + // Make a node of the message block. + TAO_Queued_Data qd (&message_block); + size_t mesg_length = 0; + + // Parse the incoming message for validity. The check needs to be + // performed by the messaging objects. + if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1) + { + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("handle_input, failed to parse input\n"), + this->id ())); + } + continue; + } + + if (qd.missing_data () == TAO_MISSING_DATA_UNDEFINED) + { + // Parse/marshal error happened. + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("handle_input, got missing data\n"), + this->id ())); + } + continue; + } + + if (message_block.length () > mesg_length) + { + // We read too much data. + if (TAO_debug_level) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Mcast_Transport[%d]::") + ACE_TEXT ("handle_input, read %d but expected %d\n"), + this->id (), + message_block.length (), + mesg_length)); + } + continue; + } + + // Process the message. + (void) this->process_parsed_messages (&qd, rh); + } + + return 0; +} + +int +TAO_UIPMC_Mcast_Transport::register_handler (void) +{ + // We never register the handler with the reactor + // as we never need to be informed about any incoming data, + // assuming we only use one-ways. + // If we would register and ICMP Messages would arrive, e.g + // due to a not reachable server, we would get informed - as this + // disturbs the general MIOP assumptions of not being + // interested in any network failures, we ignore ICMP messages. + return 0; +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h new file mode 100644 index 00000000000..406fa8a84c9 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h @@ -0,0 +1,150 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file UIPMC_Mcast_Transport.h + * + * $Id$ + * + * @author Vladimir Zykov <vz@prismtech.com> + */ +//============================================================================= + +#ifndef TAO_UIPMC_MCAST_TRANSPORT_H +#define TAO_UIPMC_MCAST_TRANSPORT_H +#include /**/ "ace/pre.h" + +#include "orbsvcs/PortableGroup/portablegroup_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/Transport.h" + +#include "ace/SOCK_Stream.h" +#include "ace/Svc_Handler.h" +#include "ace/Refcountable_T.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +// Forward decls. +class TAO_ORB_Core; +class TAO_UIPMC_Mcast_Connection_Handler; + +namespace TAO_PG +{ + class UIPMC_Recv_Packet_Cleanup_Guard; + class UIPMC_Recv_Packet; +} + +/** + * @class TAO_UIPMC_Mcast_Transport + * + * @brief Specialization of the base TAO_Transport class to handle the + * server side MIOP protocol. + */ +class TAO_PortableGroup_Export TAO_UIPMC_Mcast_Transport : public TAO_Transport +{ + // This is neccessary on some old compilers such as Studio 9 for SunOS + // to let UIPMC_Recv_Packet_Cleanup_Guard access private cleanup_packets(). + friend class TAO_PG::UIPMC_Recv_Packet_Cleanup_Guard; + +public: + typedef ACE_Hash_Map_Manager<u_long, + TAO_PG::UIPMC_Recv_Packet *, + ACE_SYNCH_NULL_MUTEX> Packets_Map; + + /// Constructor. + TAO_UIPMC_Mcast_Transport (TAO_UIPMC_Mcast_Connection_Handler *handler, + TAO_ORB_Core *orb_core); + + /// Default destructor. + ~TAO_UIPMC_Mcast_Transport (void); + + /// Look for the documentation in Transport.h. + virtual int handle_input (TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time = 0); + +protected: + /** @name Overridden Template Methods + * + * These are implementations of template methods declared by TAO_Transport. + */ + //@{ + virtual ACE_Event_Handler *event_handler_i (void); + virtual TAO_Connection_Handler *connection_handler_i (void); + + /// Write the complete Message_Block chain to the connection. + /// Shouldn't ever be called on the server side. + virtual ssize_t send (iovec *, + int, + size_t &, + ACE_Time_Value const *); + + /// Shouldn't ever be called. We use recv_all() with different semantics. + virtual ssize_t recv (char *, + size_t, + ACE_Time_Value const *); + + virtual int register_handler (void); + +public: + /// @@TODO: These methods IMHO should have more meaningful + /// names. The names seem to indicate nothing. + /// Shouldn't ever be called on the server side. + virtual int send_request (TAO_Stub *, + TAO_ORB_Core *, + TAO_OutputCDR &, + TAO_Message_Semantics, + ACE_Time_Value *); + + /// Shouldn't ever be called on the server side. + virtual int send_message (TAO_OutputCDR &, + TAO_Stub * = 0, + TAO_ServerRequest * = 0, + TAO_Message_Semantics = TAO_Message_Semantics (), + ACE_Time_Value * = 0); + //@} + +private: + /// Receive a UDP message and extract all necessary info from the MIOP + /// header. If everything is fine return a pointer to the first byte of + /// the non-MIOP data. + char *recv_packet (char *buf, size_t len, + ACE_INET_Addr &from_addr, + CORBA::UShort &packet_length, + CORBA::ULong &packet_number, + bool &stop_packet, + u_long &id_hash) const; + + /// Receive as much UDP packets as possible. + bool recv_all (void); + + /// Cleanup either all packets or expired only depending the + /// expired_only flag. + void cleanup_packets (bool expired_only); + +private: + /// The connection service handler used for accessing lower layer + /// communication protocols. + TAO_UIPMC_Mcast_Connection_Handler *connection_handler_; + + /// Incomplete packets. + Packets_Map incomplete_; + + /// A lock for ensuring that only one thread is doing recv. + TAO_SYNCH_MUTEX recv_lock_; + + /// Complete packets. + typedef ACE_Unbounded_Queue<TAO_PG::UIPMC_Recv_Packet *> Packets_Queue; + Packets_Queue complete_; + + /// A lock for access synchronization to complete queue. + TAO_SYNCH_MUTEX complete_lock_; +}; + +TAO_END_VERSIONED_NAMESPACE_DECL + +#include /**/ "ace/post.h" +#endif /* TAO_UIPMC_MCAST_TRANSPORT_H */ diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.cpp index 4e3d4173fcf..6389d1b78ba 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.cpp @@ -7,14 +7,14 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL UIPMC_Message_Block_Data_Iterator::UIPMC_Message_Block_Data_Iterator (iovec *iov, int iovcnt) : iov_ (iov), iovcnt_ (iovcnt), - iov_ptr_ (0), iov_index_ (0), + iov_ptr_ (0), iov_len_left_ (0), state_ (INTER_BLOCK) { } -size_t +bool UIPMC_Message_Block_Data_Iterator::next_block (size_t max_length, iovec &block) { @@ -22,38 +22,35 @@ UIPMC_Message_Block_Data_Iterator::next_block (size_t max_length, { // Check that there are some iovec buffers left. if (this->iov_index_ >= this->iovcnt_) - return 0; + return false; - size_t current_iov_len = - this->iov_[this->iov_index_].iov_len; - - if (current_iov_len <= max_length) + if (this->iov_[this->iov_index_].iov_len <= max_length) { // Return the full data portion. - block.iov_len = static_cast<u_long> (current_iov_len); + block.iov_len = this->iov_[this->iov_index_].iov_len; block.iov_base = this->iov_[this->iov_index_].iov_base; // Go to the next block. - this->iov_index_++; + ++this->iov_index_; - return current_iov_len; + return true; } else { // Let the caller use the first part of this // message block. - block.iov_len = static_cast<u_long> (max_length); + block.iov_len = max_length; block.iov_base = this->iov_[this->iov_index_].iov_base; // Break up the block. - this->iov_len_left_ = current_iov_len - max_length; + this->iov_len_left_ = + this->iov_[this->iov_index_].iov_len - max_length; this->iov_ptr_ = - reinterpret_cast<char *> (reinterpret_cast<char *> (block.iov_base) - + max_length); + reinterpret_cast<char *> (block.iov_base) + max_length; this->state_ = INTRA_BLOCK; - return max_length; + return true; } } else @@ -62,26 +59,27 @@ UIPMC_Message_Block_Data_Iterator::next_block (size_t max_length, if (this->iov_len_left_ <= max_length) { // Return everything that's left in the block. - block.iov_len = static_cast<u_long> (this->iov_len_left_); + block.iov_len = this->iov_len_left_; block.iov_base = this->iov_ptr_; // Go to the next block. - this->iov_index_++; + ++this->iov_index_; // Update the state. this->state_ = INTER_BLOCK; - return this->iov_len_left_; + return true; } else { // Split a little more off the block. - block.iov_len = static_cast<u_long> (this->iov_len_left_); + block.iov_len = max_length; block.iov_base = this->iov_ptr_; this->iov_len_left_ -= max_length; this->iov_ptr_ += max_length; - return max_length; + + return true; } } } diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h index 0378e1aac7c..0bfffc1e5fc 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h @@ -38,30 +38,26 @@ public: UIPMC_Message_Block_Data_Iterator (iovec *iov, int iovcnt); /// Get the next data block that has a size less than or equal - /// to max_length. Return the length of the block returned. - size_t next_block (size_t max_length, - iovec &block); + /// to max_length. Return true if there is anything left. + bool next_block (size_t max_length, iovec &block); private: - enum State - { - INTER_BLOCK, - INTRA_BLOCK - }; - iovec *iov_; int iovcnt_; + int iov_index_; // Point internal to a message block, if we have to split one up. char *iov_ptr_; - int iov_index_; // Length used in a split message block. size_t iov_len_left_; // Current message iterator state. - State state_; - + enum State + { + INTER_BLOCK, + INTRA_BLOCK + } state_; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.cpp index 0bf6969b917..e1df3954d45 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.cpp @@ -14,9 +14,10 @@ static const char the_prefix[] = "miop"; -// UIPMC doesn't support object keys, so send profiles by default in the GIOP 1.2 target -// specification. -static const CORBA::Short default_addressing_mode_ = TAO_Target_Specification::Profile_Addr; +// UIPMC doesn't support object keys, so send profiles by default in the +// GIOP 1.2 target specification. +static const CORBA::Short default_addressing_mode_ = + TAO_Target_Specification::Profile_Addr; TAO_BEGIN_VERSIONED_NAMESPACE_DECL @@ -31,22 +32,38 @@ TAO_UIPMC_Profile::object_key_delimiter (void) const TAO_UIPMC_Profile::TAO_UIPMC_Profile (TAO_ORB_Core *orb_core) : TAO_Profile (IOP::TAG_UIPMC, orb_core, - TAO_GIOP_Message_Version (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR)), - endpoint_ (), - tagged_profile_ () + TAO_GIOP_Message_Version (TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR)) + , endpoint_ () + , tagged_profile_ () + , group_id_ (0) + , ref_version_ (0) + , has_ref_version_ (false) { - addressing_mode_ = default_addressing_mode_; + // The default for component version is 1.0. + this->component_version_.major = 1; + this->component_version_.major = 0; + + this->addressing_mode_ = default_addressing_mode_; } TAO_UIPMC_Profile::TAO_UIPMC_Profile (const ACE_INET_Addr &addr, TAO_ORB_Core *orb_core) : TAO_Profile (IOP::TAG_UIPMC, orb_core, - TAO_GIOP_Message_Version (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR)), - endpoint_ (addr), - tagged_profile_ () + TAO_GIOP_Message_Version (TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR)) + , endpoint_ (addr) + , tagged_profile_ () + , group_id_ (0) + , ref_version_ (0) + , has_ref_version_ (false) { - addressing_mode_ = default_addressing_mode_; + // The default for component version is 1.0. + this->component_version_.major = 1; + this->component_version_.major = 0; + + this->addressing_mode_ = default_addressing_mode_; } TAO_UIPMC_Profile::TAO_UIPMC_Profile (const CORBA::Octet class_d_address[4], @@ -54,30 +71,21 @@ TAO_UIPMC_Profile::TAO_UIPMC_Profile (const CORBA::Octet class_d_address[4], TAO_ORB_Core *orb_core) : TAO_Profile (IOP::TAG_UIPMC, orb_core, - TAO_GIOP_Message_Version (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR)), - endpoint_ (class_d_address, port), - tagged_profile_ () + TAO_GIOP_Message_Version (TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR)) + , endpoint_ (class_d_address, port) + , tagged_profile_ () + , group_id_ (0) + , ref_version_ (0) + , has_ref_version_ (false) { - addressing_mode_ = default_addressing_mode_; -} + // The default for component version is 1.0. + this->component_version_.major = 1; + this->component_version_.major = 0; -/* - -TAO_UIPMC_Profile::TAO_UIPMC_Profile (const char *string, - TAO_ORB_Core *orb_core) - : TAO_Profile (TAO_TAG_UIPMC_PROFILE, - orb_core, - TAO_GIOP_Message_Version (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR)), - endpoint_ (), - tagged_profile_ () -{ - this->add_group_component (); - this->parse_string (string); - addressing_mode_ = default_addressing_mode_; + this->addressing_mode_ = default_addressing_mode_; } -*/ - TAO_UIPMC_Profile::~TAO_UIPMC_Profile (void) { } @@ -91,17 +99,32 @@ TAO_UIPMC_Profile::decode (TAO_InputCDR& cdr) // Read and verify major, minor versions, ignoring profiles // whose versions we don't understand. - if (!(cdr.read_octet (this->version_.major) - && this->version_.major == TAO_DEF_GIOP_MAJOR - && cdr.read_octet (this->version_.minor) - && this->version_.minor <= TAO_DEF_GIOP_MINOR)) + CORBA::Octet major; + CORBA::Octet minor; + if (!cdr.read_octet (major) || !cdr.read_octet (minor)) { - if (TAO_debug_level > 0) + if (TAO_debug_level) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::decode - v%d.%d\n"), - this->version_.major, - this->version_.minor)); + ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::decode, ") + ACE_TEXT ("can't read version %d.%d\n"), + major, + minor)); + } + + return -1; + } + + if (major > TAO_DEF_GIOP_MAJOR || + (major == TAO_DEF_GIOP_MAJOR && minor > TAO_DEF_GIOP_MINOR)) + { + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::decode, ") + ACE_TEXT ("unsupported version %d.%d\n"), + major, + minor)); } return -1; @@ -124,7 +147,8 @@ TAO_UIPMC_Profile::decode (TAO_InputCDR& cdr) // If there is extra data in the profile we are supposed to // ignore it, but print a warning just in case... ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%d bytes out of %d left after profile data\n"), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::decode, %d bytes ") + ACE_TEXT ("out of %u left after profile data\n"), cdr.length (), encap_len)); } @@ -144,14 +168,14 @@ TAO_UIPMC_Profile::decode_endpoints (void) int TAO_UIPMC_Profile::decode_profile (TAO_InputCDR& cdr) { - CORBA::UShort port = 0; + CORBA::UShort port = 0u; ACE_CString address; - if (!(cdr.read_string (address) - && cdr.read_ushort (port))) + if (!cdr.read_string (address) || + !cdr.read_ushort (port) ) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::decode - ") + if (TAO_debug_level) + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::decode, ") ACE_TEXT ("Couldn't unmarshal address and port!\n"))); return -1; } @@ -190,8 +214,12 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) ACE_OS::ace_isdigit (string [2]) && string[3] == '@') { - if (string[0] != '1' || - string[2] != '0') + char const + major= string[0] - '0', + minor= string[2] - '0'; + + if (major > TAO_DEF_MIOP_MAJOR || + (major == TAO_DEF_MIOP_MAJOR && minor > TAO_DEF_MIOP_MINOR)) { throw CORBA::INV_OBJREF ( CORBA::SystemException::_tao_minor_code ( @@ -212,27 +240,14 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) // // Parse the group component version. + GIOP::Version component_version; if (ACE_OS::ace_isdigit (string [0]) && string[1] == '.' && ACE_OS::ace_isdigit (string [2]) && string[3] == '-') { - CORBA::Char major; - CORBA::Char minor; - - major = (char) (string [0] - '0'); - minor = (char) (string [2] - '0'); - - // Verify that a supported version of MIOP is specified. - if (major != TAO_DEF_MIOP_MAJOR || - minor > TAO_DEF_MIOP_MINOR) - { - throw CORBA::INV_OBJREF ( - CORBA::SystemException::_tao_minor_code ( - TAO::VMCID, - EINVAL), - CORBA::COMPLETED_NO); - } + component_version.major = string [0] - '0'; + component_version.minor = string [2] - '0'; // Skip over "N.n-" string += 4; @@ -274,13 +289,13 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) string = pos + 1; pos = ACE_OS::strchr (string, '-'); - CORBA::Boolean parse_group_ref_version_flag = 0; + bool parse_group_ref_version_flag = false; if (pos != 0) { // String was terminated by a '-', so there's a group // reference version to be parsed. - parse_group_ref_version_flag = 1; + parse_group_ref_version_flag = true; } else { @@ -316,7 +331,7 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) PortableGroup::ObjectGroupId group_id = ACE_OS::strtoull (str_group_id.c_str (), 0, 10); - this->has_ref_version_ = false; + bool has_ref_version = false; PortableGroup::ObjectGroupRefVersion ref_version = 0; if (parse_group_ref_version_flag) { @@ -349,7 +364,7 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) ref_version = ACE_OS::strtoul (str_group_ref_ver.c_str (), 0, 10); - this->has_ref_version_ = true; + has_ref_version = true; } // Parse the group multicast address. @@ -381,8 +396,9 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) #if defined (ACE_HAS_IPV6) // Check if this is a (possibly) IPv6 supporting profile containing a // decimal IPv6 address representation. - if ((this->version ().major > TAO_MIN_IPV6_IIOP_MAJOR || - this->version ().minor >= TAO_MIN_IPV6_IIOP_MINOR) && + if ( (this->version ().major > TAO_MIN_IPV6_IIOP_MAJOR || + (this->version ().major == TAO_MIN_IPV6_IIOP_MAJOR && + this->version ().minor >= TAO_MIN_IPV6_IIOP_MINOR ) ) && string[0] == '[') { // In this case we have to find the end of the numeric address and @@ -391,11 +407,12 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) if (pos == 0) { // No valid IPv6 address specified. - if (TAO_debug_level > 0) + if (TAO_debug_level) { ACE_ERROR ((LM_ERROR, - ACE_TEXT ("\nTAO (%P|%t) - UIPMC_Profile: ") - ACE_TEXT ("Invalid IPv6 decimal address specified.\n"))); + ACE_TEXT ("\nTAO (%P|%t) - UIPMC_Profile::") + ACE_TEXT ("parse_string_i, Invalid IPv6 ") + ACE_TEXT ("decimal address specified.\n"))); } throw CORBA::INV_OBJREF ( @@ -412,13 +429,11 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) } } else - { #endif /* ACE_HAS_IPV6 */ + { mcast_addr = ACE_CString (string, pos - string); string = pos + 1; -#if defined (ACE_HAS_IPV6) } -#endif /* ACE_HAS_IPV6 */ size_t mcast_addr_len = mcast_addr.length (); if (ACE_OS::strspn (mcast_addr.c_str (), @@ -447,7 +462,7 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) } // Port can have name thus letters and '-' are allowed. - const char port_chars[] = + static const char port_chars[] = "-0123456789ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; size_t port_len = ACE_OS::strlen (string); if (ACE_OS::strspn (string, port_chars) != port_len) @@ -479,8 +494,10 @@ TAO_UIPMC_Profile::parse_string_i (const char *string) ACE_INET_Addr addr (mcast_port, mcast_addr.c_str ()); this->endpoint_.object_addr (addr); - this->set_group_info (group_domain_id.c_str (), + this->set_group_info (component_version, + group_domain_id.c_str (), group_id, + has_ref_version, ref_version); if (this->orb_core ()->orb_params ()->preferred_interfaces ()) @@ -507,7 +524,7 @@ TAO_UIPMC_Profile::hash (CORBA::ULong max) // Get the hashvalue for all endpoints. CORBA::ULong hashval = this->endpoint_.hash (); - hashval += this->version_.minor; + hashval += TAO_DEF_MIOP_MINOR; hashval += this->tag (); return hashval % max; @@ -534,15 +551,15 @@ TAO_UIPMC_Profile::endpoint_count (void) const char * TAO_UIPMC_Profile::to_string (void) { - // corbaloc:miop:1.2@1.0-group_id-1-1/host:port + // corbaloc:miop:1.0@1.0-group_id-1-1/host:port size_t buflen = (8 /* "corbaloc" */ + 1 /* colon separator */ + ACE_OS::strlen (::the_prefix) + /* "miop" */ 1 /* colon separator */ + - 1 /* major version */ + + 1 /* miop major version */ + 1 /* decimal point */ + - 1 /* minor version */ + + 1 /* miop minor version */ + 1 /* `@' character */ + 1 /* component major version */ + 1 /* decimal point */ + @@ -570,10 +587,10 @@ TAO_UIPMC_Profile::to_string (void) "corbaloc:%s:%c.%c@%c.%c-%s-" ACE_UINT64_FORMAT_SPECIFIER_ASCII, ::the_prefix, - digits [this->version_.major], - digits [this->version_.minor], digits [TAO_DEF_MIOP_MAJOR], digits [TAO_DEF_MIOP_MINOR], + digits [this->component_version_.major], + digits [this->component_version_.minor], this->group_domain_id_.c_str (), this->group_id_); @@ -651,90 +668,30 @@ TAO_UIPMC_Profile::create_tagged_profile (void) void TAO_UIPMC_Profile::create_profile_body (TAO_OutputCDR &encap) const { - encap.write_octet (TAO_ENCAP_BYTE_ORDER); - - // The GIOP version - // Note: Only GIOP 1.2 and above are supported currently for MIOP. - encap.write_octet (this->version_.major); - encap.write_octet (this->version_.minor); - - // Address. - encap.write_string (this->endpoint_.host ()); - - // Port number. - encap.write_ushort (this->endpoint_.port ()); + this->encodeAddressInfo (encap); // UIPMC is only supported by versions of GIOP that have tagged components, // so unconditionally encode the components. this->tagged_components ().encode (encap); } -/* -int -TAO_UIPMC_Profile::decode_endpoints (void) -{ - IOP::TaggedComponent tagged_component; - tagged_component.tag = TAO_TAG_ENDPOINTS; - - if (this->tagged_components_.get_component (tagged_component)) - { - const CORBA::Octet *buf = - tagged_component.component_data.get_buffer (); - - TAO_InputCDR in_cdr (reinterpret_cast<const char*> (buf), - tagged_component.component_data.length ()); - - // Extract the Byte Order. - CORBA::Boolean byte_order; - if ((in_cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0) - return -1; - in_cdr.reset_byte_order (static_cast<int> (byte_order)); - - // Extract endpoints sequence. - TAO_UIPMCEndpointSequence endpoints; - - if ((in_cdr >> endpoints) == 0) - return -1; - - // Get the priority of the first endpoint (head of the list. - // It's other data is extracted as part of the standard profile - // decoding. - this->endpoint_.priority (endpoints[0].priority); - - // Use information extracted from the tagged component to - // populate the profile. Skip the first endpoint, since it is - // always extracted through standard profile body. Also, begin - // from the end of the sequence to preserve endpoint order, - // since <add_endpoint> method reverses the order of endpoints - // in the list. - for (CORBA::ULong i = endpoints.length () - 1; - i > 0; - --i) - { - TAO_UIPMC_Endpoint *endpoint = 0; - ACE_NEW_RETURN (endpoint, - TAO_UIPMC_Endpoint (endpoints[i].host, - endpoints[i].port, - endpoints[i].priority), - -1); - - this->add_endpoint (endpoint); - } - } - - return 0; -} -*/ - void -TAO_UIPMC_Profile::set_group_info (const char *domain_id, - PortableGroup::ObjectGroupId group_id, - PortableGroup::ObjectGroupRefVersion ref_version) +TAO_UIPMC_Profile::set_group_info ( + GIOP::Version const &component_version, + const char *domain_id, + PortableGroup::ObjectGroupId group_id, + bool has_ref_version, + PortableGroup::ObjectGroupRefVersion ref_version) { // First, record the group information. + this->component_version_ = component_version; this->group_domain_id_.set (domain_id); this->group_id_ = group_id; - this->ref_version_ = ref_version; + this->has_ref_version_ = has_ref_version; + if (has_ref_version) + { + this->ref_version_ = ref_version; + } // Update the cached version of the group component. this->update_cached_group_component (); @@ -746,8 +703,8 @@ TAO_UIPMC_Profile::update_cached_group_component (void) PortableGroup::TagGroupTaggedComponent group; // Encode the data structure. - group.component_version.major = TAO_DEF_MIOP_MAJOR; - group.component_version.minor = TAO_DEF_MIOP_MINOR; + group.component_version.major = this->component_version_.major; + group.component_version.minor = this->component_version_.minor; group.group_domain_id = CORBA::string_dup (this->group_domain_id_.c_str ()); group.object_group_id = this->group_id_; @@ -761,8 +718,11 @@ TAO_UIPMC_Profile::update_cached_group_component (void) // Write the group information. if ((out_cdr << group) == 0) { - ACE_DEBUG ((LM_DEBUG, - "Error marshaling group component!")); + if (TAO_debug_level) + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::") + ACE_TEXT ("update_cached_group_component, Error ") + ACE_TEXT ("marshaling group component!"))); return; } @@ -791,19 +751,17 @@ TAO_UIPMC_Profile::update_cached_group_component (void) void TAO_UIPMC_Profile::request_target_specifier ( - TAO_Target_Specification &target_spec, - TAO_Target_Specification::TAO_Target_Address required_type) + TAO_Target_Specification &target_spec, + TAO_Target_Specification::TAO_Target_Address required_type) { // Fill out the target specifier based on the required type. switch (required_type) { case TAO_Target_Specification::Profile_Addr: - // Only using a profile as the target specifier is supported // at this time. Object keys are strictly not supported since // UIPMC profiles do not have object keys. - target_spec.target_specifier ( - this->create_tagged_profile ()); + target_spec.target_specifier (this->create_tagged_profile ()); break; case TAO_Target_Specification::Key_Addr: @@ -845,14 +803,18 @@ TAO_UIPMC_Profile::addressing_mode (CORBA::Short addr_mode) } int -TAO_UIPMC_Profile::extract_group_component (const IOP::TaggedProfile &profile, - PortableGroup::TagGroupTaggedComponent &group) +TAO_UIPMC_Profile::extract_group_component ( + const IOP::TaggedProfile &profile, + PortableGroup::TagGroupTaggedComponent &group) { // Create the decoding stream from the encapsulation in the buffer, //#if (TAO_NO_COPY_OCTET_SEQUENCES == 1) // TAO_InputCDR cdr (profile.profile_data.mb ()); //#else - TAO_InputCDR cdr (reinterpret_cast<const char*> (profile.profile_data.get_buffer ()), + CORBA::Octet const *buf = + profile.profile_data.get_buffer (); + + TAO_InputCDR cdr (reinterpret_cast<const char*> (buf), profile.profile_data.length ()); //#endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */ @@ -865,33 +827,34 @@ TAO_UIPMC_Profile::extract_group_component (const IOP::TaggedProfile &profile, // Read and verify major, minor versions, ignoring UIPMC profiles // whose versions we don't understand. CORBA::Octet major; - CORBA::Octet minor = CORBA::Octet(); + CORBA::Octet minor; // Read the version. We just read it here. We don't*do any* // processing. - if (!(cdr.read_octet (major) - && cdr.read_octet (minor))) - { - if (TAO_debug_level > 0) - { + if (!cdr.read_octet (major) || + !cdr.read_octet (minor) ) + { + if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::extract_group_component - v%d.%d\n"), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::") + ACE_TEXT ("extract_group_component, version %d.%d\n"), major, minor)); - } - return -1; - } + return -1; + } // Decode the endpoint. ACE_CString address; CORBA::UShort port; - if (!(cdr.read_string (address) - && cdr.read_ushort (port))) + if (!cdr.read_string (address) || + !cdr.read_ushort (port) ) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::extract_group_component - Couldn't unmarshal address and port!\n"))); + if (TAO_debug_level) + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Profile::") + ACE_TEXT ("extract_group_component, Couldn't ") + ACE_TEXT ("unmarshal address and port!\n"))); return -1; } @@ -907,8 +870,7 @@ TAO_UIPMC_Profile::extract_group_component (const IOP::TaggedProfile &profile, return -1; // Found it. - const CORBA::Octet *buf = - tagged_component.component_data.get_buffer (); + buf = tagged_component.component_data.get_buffer (); TAO_InputCDR in_cdr (reinterpret_cast<const char*> (buf), tagged_component.component_data.length ()); @@ -924,4 +886,20 @@ TAO_UIPMC_Profile::extract_group_component (const IOP::TaggedProfile &profile, return 0; } +void +TAO_UIPMC_Profile::encodeAddressInfo (TAO_OutputCDR &encap) const +{ + encap.write_octet (TAO_ENCAP_BYTE_ORDER); + + // The MIOP version + encap.write_octet (TAO_DEF_MIOP_MAJOR); + encap.write_octet (TAO_DEF_MIOP_MINOR); + + // Address. + encap.write_string (this->endpoint_.host ()); + + // Port number. + encap.write_ushort (this->endpoint_.port ()); +} + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.h index c2d227d00d8..994af6d4274 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Profile.h @@ -72,10 +72,6 @@ public: CORBA::UShort port, TAO_ORB_Core *orb_core); - /// Create object using a string ior. - TAO_UIPMC_Profile (const char *string, - TAO_ORB_Core *orb_core); - /// Destructor is to be called only through _decr_refcnt. ~TAO_UIPMC_Profile (void); @@ -84,28 +80,29 @@ public: /// N.B. We have to override the TAO_Profile default decode because /// in UIPMC there is no object key marshalled and we do not implement /// a useable decode_endpoints - virtual int decode (TAO_InputCDR& cdr); + virtual int decode (TAO_InputCDR &cdr); virtual void parse_string (const char *string); virtual char * to_string (void); virtual int encode_endpoints (void); + virtual void encodeAddressInfo (TAO_OutputCDR &stream) const; virtual TAO_Endpoint *endpoint (void); virtual CORBA::ULong endpoint_count (void) const; virtual CORBA::ULong hash (CORBA::ULong max); virtual IOP::TaggedProfile &create_tagged_profile (void); virtual void request_target_specifier ( - TAO_Target_Specification &target_spec, - TAO_Target_Specification::TAO_Target_Address r); + TAO_Target_Specification &target_spec, + TAO_Target_Specification::TAO_Target_Address r); virtual int supports_multicast (void) const; virtual void addressing_mode (CORBA::Short addr_mode); static int extract_group_component (const IOP::TaggedProfile &profile, PortableGroup::TagGroupTaggedComponent &group); /// Add the mandatory group component to this profile. - void set_group_info (const char *domain_id, + void set_group_info (GIOP::Version const &component_version, + const char *domain_id, PortableGroup::ObjectGroupId group_id, + bool has_ref_version, PortableGroup::ObjectGroupRefVersion ref_version); - - protected: /// Template methods, please see documentation in tao/Profile.h virtual int decode_profile (TAO_InputCDR& cdr); @@ -116,7 +113,6 @@ protected: virtual void update_cached_group_component (void); protected: - /** * Head of this profile's list of endpoints. This endpoint is not * dynamically allocated because a profile always contains at least @@ -135,10 +131,12 @@ protected: TAO_UIPMC_Endpoint endpoint_; private: - /// Cached version of our tagged profile. IOP::TaggedProfile tagged_profile_; + /// Group component version. + GIOP::Version component_version_; + /// Group Domain ID. ACE_CString group_domain_id_; diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp index e6b58a05a60..c3c581070c6 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp @@ -1,533 +1,428 @@ // $Id$ -#ifndef TAO_UIPMC_TRANSPORT_CPP -#define TAO_UIPMC_TRANSPORT_CPP - -#include "orbsvcs/PortableGroup/UIPMC_Profile.h" #include "orbsvcs/PortableGroup/UIPMC_Transport.h" +#include "orbsvcs/PortableGroup/miopconf.h" +#include "orbsvcs/PortableGroup/UIPMC_Connection_Handler.h" #include "orbsvcs/PortableGroup/UIPMC_Message_Block_Data_Iterator.h" #include "orbsvcs/PortableGroup/UIPMC_Wait_Never.h" +#include "orbsvcs/PortableGroup/miop_resource.h" -#include "tao/Acceptor_Registry.h" -#include "tao/operation_details.h" -#include "tao/Timeprobe.h" -#include "tao/CDR.h" -#include "tao/Transport_Mux_Strategy.h" -#include "tao/Wait_Strategy.h" -#include "tao/Stub.h" #include "tao/ORB_Core.h" #include "tao/debug.h" -#include "tao/Resume_Handle.h" #include "tao/GIOP_Message_Base.h" -// Local MIOP Definitions: - -// Note: We currently support packet fragmentation on transmit, but -// do not support reassembly. - -// Limit the number of fragments that we can divide a message -// into. -#define MIOP_MAX_FRAGMENTS (1) -#define MIOP_MAX_HEADER_SIZE (272) // See MIOP Spec. Must be a multiple of 8. -#define MIOP_MAX_DGRAM_SIZE (ACE_MAX_UDP_PACKET_SIZE) - -#define MIOP_MAGIC_OFFSET (0) -#define MIOP_VERSION_OFFSET (4) -#define MIOP_FLAGS_OFFSET (5) -#define MIOP_PACKET_LENGTH_OFFSET (6) -#define MIOP_PACKET_NUMBER_OFFSET (8) -#define MIOP_NUMBER_OF_PACKETS_OFFSET (12) -#define MIOP_ID_LENGTH_OFFSET (16) -#define MIOP_MIN_LENGTH_ID (0) -#define MIOP_MAX_LENGTH_ID (252) -#define MIOP_ID_DEFAULT_LENGTH (12) -#define MIOP_ID_CONTENT_OFFSET (20) -#define MIOP_HEADER_PADDING (0) // The ID field needs to be padded to - // a multiple of 8 bytes. -#define MIOP_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \ - + MIOP_ID_DEFAULT_LENGTH \ - + MIOP_HEADER_PADDING) -#define MIOP_MIN_HEADER_SIZE (MIOP_ID_CONTENT_OFFSET \ - + MIOP_MIN_LENGTH_ID \ - + (8 - MIOP_MIN_LENGTH_ID) /* padding */) - -static const CORBA::Octet miop_magic[4] = { 0x4d, 0x49, 0x4f, 0x50 }; // 'M', 'I', 'O', 'P' +#include "ace/UUID.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL -struct MIOP_Packet -{ - iovec iov[ACE_IOV_MAX]; - int iovcnt; - int length; -}; - -template<typename CONNECTION_HANDLER> -TAO_UIPMC_Transport<CONNECTION_HANDLER>::TAO_UIPMC_Transport ( - CONNECTION_HANDLER *handler, +TAO_UIPMC_Transport::TAO_UIPMC_Transport ( + TAO_UIPMC_Connection_Handler *handler, TAO_ORB_Core *orb_core ) - : TAO_Transport (IOP::TAG_UIPMC, - orb_core, - MIOP_MAX_DGRAM_SIZE) + : TAO_Transport (IOP::TAG_UIPMC, orb_core) , connection_handler_ (handler) + , total_bytes_outstanding_ (0u) + , time_last_sent_ (ACE_Time_Value::zero) { // Replace the default wait strategy with our own // since we don't support waiting on anything. delete this->ws_; - ACE_NEW (this->ws_, - TAO_UIPMC_Wait_Never (this)); + ACE_NEW (this->ws_, TAO_UIPMC_Wait_Never (this)); + + ACE_Utils::UUID uuid; + ACE_Utils::UUID_GENERATOR::instance ()->generate_UUID (uuid); + + // This ID is "globally" unique. + this->uuid_hash_ = uuid.to_string ()->hash (); } -template<typename CONNECTION_HANDLER> -TAO_UIPMC_Transport<CONNECTION_HANDLER>::~TAO_UIPMC_Transport (void) +TAO_UIPMC_Transport::~TAO_UIPMC_Transport (void) { } -template<typename CONNECTION_HANDLER> ACE_Event_Handler * -TAO_UIPMC_Transport<CONNECTION_HANDLER>::event_handler_i (void) +TAO_UIPMC_Transport::event_handler_i (void) { return this->connection_handler_; } -template<typename CONNECTION_HANDLER> TAO_Connection_Handler * -TAO_UIPMC_Transport<CONNECTION_HANDLER>::connection_handler_i (void) +TAO_UIPMC_Transport::connection_handler_i (void) { return this->connection_handler_; } -template<typename CONNECTION_HANDLER> -void -TAO_UIPMC_Transport<CONNECTION_HANDLER>::write_unique_id (TAO_OutputCDR &miop_hdr, - unsigned long unique) -{ - // We currently construct a unique ID for each MIOP message by - // concatenating the address of the buffer to a counter. We may - // also need to use a MAC address or something more unique to - // fully comply with the MIOP specification. - - static unsigned long counter = 1; // Don't worry about race conditions on counter, - // since buffer addresses can't be the same if - // this is being called simultaneously. - - CORBA::Octet unique_id[MIOP_ID_DEFAULT_LENGTH]; - - unique_id[0] = static_cast<CORBA::Octet> (unique & 0xff); - unique_id[1] = static_cast<CORBA::Octet> ((unique & 0xff00) >> 8); - unique_id[2] = static_cast<CORBA::Octet> ((unique & 0xff0000) >> 16); - unique_id[3] = static_cast<CORBA::Octet> ((unique & 0xff000000) >> 24); - - unique_id[4] = static_cast<CORBA::Octet> (counter & 0xff); - unique_id[5] = static_cast<CORBA::Octet> ((counter & 0xff00) >> 8); - unique_id[6] = static_cast<CORBA::Octet> ((counter & 0xff0000) >> 16); - unique_id[7] = static_cast<CORBA::Octet> ((counter & 0xff000000) >> 24); - - unique_id[8] = 0; - unique_id[9] = 0; - unique_id[10] = 0; - unique_id[11] = 0; - - miop_hdr.write_ulong (MIOP_ID_DEFAULT_LENGTH); - miop_hdr.write_octet_array (unique_id, MIOP_ID_DEFAULT_LENGTH); -} - -template<typename CONNECTION_HANDLER> -ssize_t -TAO_UIPMC_Transport<CONNECTION_HANDLER>::send (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *) +bool +TAO_UIPMC_Transport::write_unique_id ( + TAO_OutputCDR &miop_hdr) const { - const ACE_INET_Addr &addr = this->connection_handler_->addr (); - bytes_transferred = 0; - - // Calculate the bytes to send. This value is only used for - // error conditions to fake a good return. We do this for - // semantic consistency with DIOP, and since errors aren't - // handled correctly from send_i (our fault). If these - // semantics are not desirable, the error handling problems - // that need to be fixed can be found in - // UIPMC_Connection_Handler::decr_refcount which will need to - // deregister the connection handler from the UIPMC_Connector - // cache. - ssize_t bytes_to_send = 0; - for (int i = 0; i < iovcnt; i++) - bytes_to_send += iov[i].iov_len; - - MIOP_Packet fragments[MIOP_MAX_FRAGMENTS]; - MIOP_Packet *current_fragment = 0; - int num_fragments = 1; + // This is unique within single machine. + unsigned long pid = static_cast<unsigned long> (ACE_OS::getpid ()); - UIPMC_Message_Block_Data_Iterator mb_iter (iov, iovcnt); + // This is unique within single process. + static ACE_Atomic_Op<TAO_SYNCH_MUTEX, unsigned long> cnt = 0; + unsigned long const id = ++cnt; - // Initialize the first fragment - current_fragment = &fragments[0]; - current_fragment->iovcnt = 1; // The MIOP Header - current_fragment->length = 0; + CORBA::Octet unique_id[MIOP_DEFAULT_ID_LENGTH]; - // Go through all of the message blocks. - while (mb_iter.next_block (MIOP_MAX_DGRAM_SIZE - current_fragment->length, - current_fragment->iov[current_fragment->iovcnt])) - { - // Increment the length and iovcnt. - current_fragment->length += current_fragment->iov[current_fragment->iovcnt].iov_len; - current_fragment->iovcnt++; - - // Check if we've filled up this fragment or if we've run out of - // iov entries. - if (current_fragment->length == MIOP_MAX_DGRAM_SIZE || - current_fragment->iovcnt == ACE_IOV_MAX) - { - // Make a new fragment. - ++num_fragments; +#define UIPMC_OCTET(value) static_cast<CORBA::Octet> ((value) & 0xff) - // Check if too many fragments - if (num_fragments > MIOP_MAX_FRAGMENTS) - { - // This is an error as we do not send more. - // Silently drop the message but log an error. - - // Pluggable_Messaging::transport_message only - // cares if it gets -1 or 0 so we can return a - // partial length and it will think all has gone - // well. - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\n\nTAO (%P|%t) - ") - ACE_TEXT ("UIPMC_Transport::send ") - ACE_TEXT ("Message of size %d needs too many MIOP fragments (max is %d).\n"), - bytes_to_send, - MIOP_MAX_FRAGMENTS - )); -#if MIOP_MAX_DGRAM_SIZE < ACE_MAX_UDP_PACKET_SIZE - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("You may be able to increase MIOP_MAX_DGRAM_SIZE.\n"))); -#endif - } - - // Pretend it is o.k. See note by bytes_to_send calculation. - bytes_transferred = bytes_to_send; - return 1; - } + // Use the lowest 4 bytes. In case of IPv6 it gives more diversity. + unique_id[0] = UIPMC_OCTET (this->uuid_hash_); + unique_id[1] = UIPMC_OCTET (this->uuid_hash_ >> 8); + unique_id[2] = UIPMC_OCTET (this->uuid_hash_ >> 16); + unique_id[3] = UIPMC_OCTET (this->uuid_hash_ >> 24); - // Otherwise, initialize another fragment. - ++current_fragment; - current_fragment->iovcnt = 1; // The MIOP Header - current_fragment->length = 0; - } - } + unique_id[4] = UIPMC_OCTET (pid); + unique_id[5] = UIPMC_OCTET (pid >> 8); + unique_id[6] = UIPMC_OCTET (pid >> 16); + unique_id[7] = UIPMC_OCTET (pid >> 24); - // Build a generic MIOP Header. + unique_id[8] = UIPMC_OCTET (id); + unique_id[9] = UIPMC_OCTET (id >> 8); + unique_id[10] = UIPMC_OCTET (id >> 16); + unique_id[11] = UIPMC_OCTET (id >> 24); - // Allocate space on the stack for the header (add 8 to account for - // the possibility of adjusting for alignment). - char header_buffer[MIOP_HEADER_SIZE + 8]; - TAO_OutputCDR miop_hdr (header_buffer, MIOP_HEADER_SIZE + 8); + miop_hdr.write_ulong (MIOP_DEFAULT_ID_LENGTH); + miop_hdr.write_octet_array (unique_id, MIOP_DEFAULT_ID_LENGTH); - miop_hdr.write_octet_array (miop_magic, 4); // Magic - miop_hdr.write_octet (0x10); // Version - CORBA::Octet *flags_field = reinterpret_cast<CORBA::Octet *> (miop_hdr.current ()->wr_ptr ()); - - // Write flags octet: - // Bit Description - // 0 Endian - // 1 Stop message flag (Assigned later) - // 2 - 7 Set to 0 - miop_hdr.write_octet (TAO_ENCAP_BYTE_ORDER); // Flags - - // Packet Length - // NOTE: We can save pointers and write them later without byte swapping since - // in CORBA, the sender chooses the endian. - CORBA::UShort *packet_length = reinterpret_cast<CORBA::UShort *> (miop_hdr.current ()->wr_ptr ()); - miop_hdr.write_short (0); - - // Packet number - CORBA::ULong *packet_number = reinterpret_cast<CORBA::ULong *> (miop_hdr.current ()->wr_ptr ()); - miop_hdr.write_ulong (0); - - // Number of packets field - miop_hdr.write_ulong (num_fragments); - - // UniqueId - ptrdiff_t unique_id = reinterpret_cast<ptrdiff_t> (iov); - this->write_unique_id (miop_hdr, - static_cast<unsigned long> (unique_id)); + return miop_hdr.good_bit (); +} - // Send the buffers. - current_fragment = &fragments[0]; - while (num_fragments > 0 && - current_fragment->iovcnt > 1) +void +TAO_UIPMC_Transport::throttle_send_rate ( + ACE_UINT64 const max_fragment_rate, + u_long const max_fragment_size, + u_long const this_fragment_size) +{ + ACE_Time_Value const now = ACE_OS::gettimeofday (); + if (this->total_bytes_outstanding_) { - // Fill in the packet length header field. - *packet_length = static_cast<CORBA::UShort> (current_fragment->length); - - // If this is the last fragment, set the stop message flag. - if (num_fragments == 1) + // How much of the previously sent data have we had time to send? + ACE_Time_Value const how_long = now - this->time_last_sent_; + ACE_UINT64 how_long_in_micro_seconds = 0u; + how_long.to_usec (how_long_in_micro_seconds); + + ACE_UINT64 const octets_processed = + how_long_in_micro_seconds * max_fragment_size / max_fragment_rate; + if (this->total_bytes_outstanding_ <= octets_processed) { - *flags_field |= 0x02; + if (2 <= TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::") + ACE_TEXT ("throttle_send_rate, Previous data ") + ACE_TEXT ("(%u bytes) has cleared ") + ACE_TEXT ("(could have sent %Q bytes over ") + ACE_TEXT ("the last %Q uSecs)\n"), + this->id (), + this->total_bytes_outstanding_, + octets_processed, + how_long_in_micro_seconds)); + this->total_bytes_outstanding_ = 0u; } - - // Setup the MIOP header in the iov list. - current_fragment->iov[0].iov_base = miop_hdr.current ()->rd_ptr (); - current_fragment->iov[0].iov_len = MIOP_HEADER_SIZE; - - // Send the fragment. - Need to check for errors!! - ssize_t rc = this->connection_handler_->send (current_fragment->iov, - current_fragment->iovcnt, - addr); - - if (rc <= 0) + else { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\n\nTAO (%P|%t) - ") - ACE_TEXT ("UIPMC_Transport::send to %C (port %u)") - ACE_TEXT (" %p\n\n"), - addr.get_host_addr (), - addr.get_port_number (), - ACE_TEXT ("Error returned from transport:"))); - } - - // Pretend it is o.k. See note by bytes_to_send calculation. - bytes_transferred = bytes_to_send; - return 1; + if (2 <= TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::") + ACE_TEXT ("throttle_send_rate, Previous data ") + ACE_TEXT ("(%u bytes) has reduced by %Q bytes ") + ACE_TEXT ("over the last %Q uSecs\n"), + this->id (), + this->total_bytes_outstanding_, + octets_processed, + how_long_in_micro_seconds)); + this->total_bytes_outstanding_ -= + static_cast<u_long> (octets_processed); } + } + this->time_last_sent_ = now; - // Increment the number of bytes transferred, but don't - // count the MIOP header that we added. - bytes_transferred += rc - MIOP_HEADER_SIZE; - - if (TAO_debug_level > 0) + if (this->total_bytes_outstanding_) + { + // Is this fragment enough to exceed the capacity of the client + // and/or server system socket buffers? + u_long const + new_total_send = this->total_bytes_outstanding_ + this_fragment_size, + hwm = this->connection_handler_->send_hi_water_mark (); + if (new_total_send > hwm) { - ACE_DEBUG ((LM_DEBUG, - "TAO_UIPMC_Transport::send: sent %d bytes to %s:%d\n", - rc, - addr.get_host_addr (), - addr.get_port_number ())); + // Definatly need to throttle back and delay before sending + // this new packet. How much extra data are we trying to fit + // into the various socket buffers? + u_long bytes_over_by = new_total_send - hwm; + + // We need to wait until existing "bytes_over_by" data has been + // transmitted/processed; however since we only have + // "total_bytes_outstanding" actualy in progress at the moment, + // this is the maximum amount we can wait for. (This can happen + // if the High Water Mark is set less than the "max_fragment_size" + // and means we can initialy exceed the HWM; will simply wait + // longer before the next fragment, after this current one.) + if (bytes_over_by > this->total_bytes_outstanding_) + bytes_over_by = this->total_bytes_outstanding_; + + // How much time does it take for "bytes_over_by" data to be + // processed? + ACE_UINT64 const delay_in_micro_seconds= + bytes_over_by * max_fragment_rate / max_fragment_size; + ACE_Time_Value const delay ( + static_cast <time_t> (delay_in_micro_seconds / ACE_ONE_SECOND_IN_USECS), + static_cast <suseconds_t> (delay_in_micro_seconds % ACE_ONE_SECOND_IN_USECS)); + + if (TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::") + ACE_TEXT ("throttle_send_rate, SendHighWaterMark ") + ACE_TEXT ("(%u) exceeded by %u bytes, delaying ") + ACE_TEXT ("for %Q uSecs\n"), + this->id (), + hwm, + bytes_over_by, + delay_in_micro_seconds)); + + ACE_OS::sleep (delay); } - - // Go to the next fragment. - (*packet_number)++; - ++current_fragment; - --num_fragments; } - - // Return total bytes transferred. - return bytes_transferred; } -template<typename CONNECTION_HANDLER> ssize_t -TAO_UIPMC_Transport<CONNECTION_HANDLER>::recv (char *buf, - size_t len, - const ACE_Time_Value * /*max_wait_time*/) +TAO_UIPMC_Transport::send ( + iovec *iov, + int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *) { - ACE_INET_Addr from_addr; - - ssize_t n = this->connection_handler_->peer ().recv (buf, - len, - from_addr); - if (TAO_debug_level > 5) - { - ACE_DEBUG ((LM_DEBUG, - "TAO_UIPMC_Transport::recv: received %d bytes from %s:%d\n", - n, - from_addr.get_host_addr (), - from_addr.get_port_number ())); - } - - // Make sure that we at least have a MIOP header. - if (n < MIOP_MIN_HEADER_SIZE) + bytes_transferred = 0u; + + // Calculate the total number of bytes being sent. + u_long bytes_to_send = 0u; + for (int i = 0; i < iovcnt; ++i) + bytes_to_send += iov[i].iov_len; + if (!bytes_to_send) + return 0; // Nothing to send, we are done. + + // Determine the number of MIOP packets that we have to send (add one + // if "bytes_to_send" is not a multiple of "max_fragment_payload"). + TAO_MIOP_Resource_Factory *const factory = + ACE_Dynamic_Service<TAO_MIOP_Resource_Factory>::instance ( + this->orb_core_->configuration(), + ACE_TEXT ("MIOP_Resource_Factory")); + u_long const + max_fragment_size= factory->max_fragment_size (), + max_fragment_payload= max_fragment_size - MIOP_DEFAULT_HEADER_SIZE, + number_of_packets_required = + bytes_to_send / max_fragment_payload + + !!(bytes_to_send % max_fragment_payload); + if (factory->max_fragments () && // Any value except Zero (Unlimited) + factory->max_fragments () < number_of_packets_required) { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO_UIPMC_Transport::recv: packet of size %d is too small from %s:%d\n", - n, - from_addr.get_host_addr (), - from_addr.get_port_number ())); - } - return 0; - } + // Pretend it was sent ok, but we are dropping this message, + // it is too large to send. + bytes_transferred = bytes_to_send; - // Check for MIOP magic bytes. - if (buf[MIOP_MAGIC_OFFSET] != miop_magic [0] || - buf[MIOP_MAGIC_OFFSET + 1] != miop_magic [1] || - buf[MIOP_MAGIC_OFFSET + 2] != miop_magic [2] || - buf[MIOP_MAGIC_OFFSET + 3] != miop_magic [3]) - { - if (TAO_debug_level > 0) + if (TAO_debug_level) { ACE_DEBUG ((LM_DEBUG, - "TAO_UIPMC_Transport::recv: UIPMC packet didn't contain magic bytes.\n")); + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::send, ") + ACE_TEXT ("Did not send MIOP message of size %u ") + ACE_TEXT ("(it was too large, needing %u fragments).\n") + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::send, ") + ACE_TEXT ("You maybe able to increase ") + ACE_TEXT ("MIOP_Resource_Factory -ORBMaxFragments %u"), + this->id (), + bytes_to_send, + number_of_packets_required, + this->id (), + factory->max_fragments ())); + + if (max_fragment_size < MIOP_MAX_DGRAM_SIZE) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT (" or -ORBMaxFragmentSize %u\n"), + max_fragment_size)); + else + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("\n"))); } - return 0; + return bytes_transferred; } - // Retrieve the byte order. - // 0 = Big endian - // 1 = Small endian - CORBA::Octet byte_order = buf[MIOP_FLAGS_OFFSET] & 0x01; - - // Ignore the header version, other flags, packet length and number of packets. - - // Get the length of the ID. - CORBA::ULong id_length; -#if !defined (ACE_DISABLE_SWAP_ON_READ) - if (byte_order == ACE_CDR_BYTE_ORDER) - { - id_length = *reinterpret_cast<ACE_CDR::ULong*> (&buf[MIOP_ID_LENGTH_OFFSET]); - } - else + // Build a generic MIOP Header on the stack for this + // "number_of_packets_required". NOTE: We can save + // pointers and write to them later without byte swapping + // since in CORBA, the sender chooses the endianess. + char header_buffer[MIOP_DEFAULT_HEADER_SIZE + ACE_CDR::MAX_ALIGNMENT]; + TAO_OutputCDR miop_hdr (header_buffer, sizeof header_buffer); + miop_hdr.write_octet_array (miop_magic, sizeof miop_magic); + // MIOP (Not GIOP) Version + miop_hdr.write_octet (((TAO_DEF_MIOP_MAJOR & 0xf) << 4) + + (TAO_DEF_MIOP_MINOR & 0xf)); + // Write flags octet: + // Bit Description + // 0 Endian + // 1 Stop message flag (Assigned later) + // 2-7 Set to 0 + CORBA::Octet *flags_field = + reinterpret_cast<CORBA::Octet *> (miop_hdr.current ()->wr_ptr ()); + miop_hdr.write_octet (TAO_ENCAP_BYTE_ORDER); + CORBA::UShort *packet_length = + reinterpret_cast<CORBA::UShort *> (miop_hdr.current ()->wr_ptr ()); + miop_hdr.write_short (0); + CORBA::ULong *packet_number = + reinterpret_cast<CORBA::ULong *> (miop_hdr.current ()->wr_ptr ()); + miop_hdr.write_ulong (0u); + miop_hdr.write_ulong (number_of_packets_required); + if (!this->write_unique_id (miop_hdr)) { - ACE_CDR::swap_4 (&buf[MIOP_ID_LENGTH_OFFSET], - reinterpret_cast<char*> (&id_length)); + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::send, ") + ACE_TEXT ("error creating fragment MIOP header.\n"), + this->id ())); + return -1; } -#else - id_length = *reinterpret_cast<ACE_CDR::ULong*> (&buf[MIOP_ID_LENGTH_OFFSET]); -#endif /* ACE_DISABLE_SWAP_ON_READ */ - // Make sure that the length field is legal. - if (id_length > MIOP_MAX_LENGTH_ID || - static_cast<ssize_t> (MIOP_ID_CONTENT_OFFSET + id_length) > n) + // Attempt to partition up the payload data sending each of the MIOP fragments + iovec this_fragment_iov[ACE_IOV_MAX]; + UIPMC_Message_Block_Data_Iterator mb_iter (iov, iovcnt); + ACE_INET_Addr const &addr = this->connection_handler_->addr (); + for (*packet_number= 0u; + *packet_number < number_of_packets_required; + ++*packet_number) { - if (TAO_debug_level > 0) + // The first iov for each packet points at the static MIOP header. + this_fragment_iov[0].iov_base = miop_hdr.current ()->rd_ptr (); + this_fragment_iov[0].iov_len = MIOP_DEFAULT_HEADER_SIZE; + int this_fragment_iovcnt = 1; // Just the MIOP Header so far! + u_long this_fragment_size= 0uL; // Just the payload data length (for now) + + // Obtain the next fragment's payload data. + while (mb_iter.next_block (max_fragment_payload - this_fragment_size, + this_fragment_iov[this_fragment_iovcnt])) { - ACE_DEBUG ((LM_DEBUG, - "TAO_UIPMC_Transport::recv: Invalid ID length.\n")); - } + // Increment the fragments length and iovcnt. + this_fragment_size += + this_fragment_iov[this_fragment_iovcnt++].iov_len; - return 0; - } + // Check if we have maxed out this fragment's payload. + if (this_fragment_size == max_fragment_payload) + break; - // Trim off the header for now. - ssize_t const miop_header_size = (MIOP_ID_CONTENT_OFFSET + id_length + 7) & ~0x7; - if (miop_header_size > n) - { - if (TAO_debug_level > 0) + // Just a safety check for building iovec. + if (this_fragment_iovcnt >= ACE_IOV_MAX) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::send, ") + ACE_TEXT ("Too many iovec to create fragment.\n"), + this->id ())); + return -1; + } + } // While fragment is not complete + + // Now we have the payload length for this fragment, update the MIOP header + *packet_length = static_cast<CORBA::UShort> (this_fragment_size); + if (*packet_number == number_of_packets_required-1uL) + *flags_field |= 0x02; + + ssize_t already_sent = 0; // No data sent yet! + iovec *current_iov= this_fragment_iov; + for (this_fragment_size+= MIOP_DEFAULT_HEADER_SIZE; // Now includes MIOP header + this_fragment_size; // Still any data to send + this_fragment_size-= static_cast<u_long> (already_sent)) { - ACE_DEBUG ((LM_DEBUG, - "TAO_UIPMC_Transport::recv: MIOP packet not large enough for padding.\n")); - } + // Make sure we don't send our fragments too quickly + this->throttle_send_rate ( + factory->max_fragment_rate (), + max_fragment_size, + this_fragment_size); + + // Haven't sent some of the data yet, we need to adjust the fragments iov's + // to skip the data we have actually manage to send so far. + while (already_sent) + if (current_iov->iov_len <= static_cast<u_long> (already_sent)) + { + // This whole iov has been sent, simply skip over it + already_sent-= current_iov->iov_len; + --this_fragment_iovcnt; + ++current_iov; + } + else + { + // This iov has been partially sent, adjust it's data + // to skip over those bytes already transmitted. + current_iov->iov_len -= static_cast<u_long> (already_sent); + current_iov->iov_base = + &static_cast<char *> (current_iov->iov_base)[already_sent]; + break; // already_sent= 0; + } + + // Ok now we attempt to actually send the fragment. + already_sent = + this->connection_handler_->peer ().send ( + current_iov, + this_fragment_iovcnt, + addr); + if (already_sent < 0) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::send, ") + ACE_TEXT ("error sending data '%m'\n"), + this->id ())); + return -1; + } + else if (TAO_debug_level && + static_cast<u_long> (already_sent) != this_fragment_size) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::send, ") + ACE_TEXT ("Partial fragment (%B/%u bytes), ") + ACE_TEXT ("reattempting remainder.\n"), + this->id (), + already_sent, + this_fragment_size)); + } - return 0; - } + // Keep a note of the number of bytes we have just buffered + this->total_bytes_outstanding_+= static_cast<u_long> (already_sent); + } // Keep sending the rest of the fragment - n -= miop_header_size; - ACE_OS::memmove (buf, buf + miop_header_size, n); + // Increment the number of bytes of payload transferred. + bytes_transferred += *packet_length; - return n; -} - -template<typename CONNECTION_HANDLER> -int -TAO_UIPMC_Transport<CONNECTION_HANDLER>::handle_input (TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) -{ - // If there are no messages then we can go ahead to read from the - // handle for further reading.. - - // The buffer on the stack which will be used to hold the input - // messages - char buf [MIOP_MAX_DGRAM_SIZE]; - -#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE) - (void) ACE_OS::memset (buf, - '\0', - sizeof buf); -#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */ - - // Create a data block - ACE_Data_Block db (sizeof (buf), - ACE_Message_Block::MB_DATA, - buf, - this->orb_core_->input_cdr_buffer_allocator (), - this->orb_core_->locking_strategy (), - ACE_Message_Block::DONT_DELETE, - this->orb_core_->input_cdr_dblock_allocator ()); - - // Create a message block - ACE_Message_Block message_block (&db, - ACE_Message_Block::DONT_DELETE, - this->orb_core_->input_cdr_msgblock_allocator ()); - - - // Align the message block - ACE_CDR::mb_align (&message_block); - - - // Read the message into the message block that we have created on - // the stack. - ssize_t n = this->recv (message_block.rd_ptr (), - message_block.space (), - max_wait_time); - - // If there is an error return to the reactor.. - if (n <= 0) - { - if (TAO_debug_level) + if (9 <= TAO_debug_level) { + char tmp[INET6_ADDRSTRLEN]; + addr.get_host_addr (tmp, sizeof tmp); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) recv returned error on transport %d after fault %p\n"), + ACE_TEXT ("TAO (%P|%t) - UIPMC_Transport[%d]::send, ") + ACE_TEXT ("Sent %u bytes payload (fragment %u/%u) to <%C:%u>\n"), this->id (), - ACE_TEXT ("handle_input ()\n"))); - } - - if (n == -1) - this->tms_->connection_closed (); - - return n; - } - - // Set the write pointer in the stack buffer. - message_block.wr_ptr (n); - - // Make a node of the message block.. - TAO_Queued_Data qd (&message_block); - size_t mesg_length = 0; - - // Parse the incoming message for validity. The check needs to be - // performed by the messaging objects. - if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1) - { - if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) handle_input failed on transport %d after fault\n"), - this->id () )); - } - - return -1; - } - - if (message_block.length () > mesg_length) - { - if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) handle_input failed on transport %d after fault\n"), - this->id () )); + *packet_length, + *packet_number + 1uL, + number_of_packets_required, + tmp, + addr.get_port_number ())); } + } // Send next fragment - return -1; - } - - // NOTE: We are not performing any queueing nor any checking for - // missing data. We are assuming that ALL the data would be got in a - // single read. + // Return total bytes transferred. + return bytes_transferred; +} - // Process the message - return this->process_parsed_messages (&qd, rh); +ssize_t +TAO_UIPMC_Transport::recv ( + char *, + size_t, + const ACE_Time_Value *) +{ + // Shouldn't ever be called on the client side. + ACE_ASSERT (0); + return -1; } -template<typename CONNECTION_HANDLER> int -TAO_UIPMC_Transport<CONNECTION_HANDLER>::register_handler (void) +TAO_UIPMC_Transport::register_handler (void) { - // We never register register the handler with the reactor + // We never register the handler with the reactor // as we never need to be informed about any incoming data, // assuming we only use one-ways. // If we would register and ICMP Messages would arrive, e.g @@ -537,36 +432,36 @@ TAO_UIPMC_Transport<CONNECTION_HANDLER>::register_handler (void) return 0; } -template<typename CONNECTION_HANDLER> int -TAO_UIPMC_Transport<CONNECTION_HANDLER>::send_request (TAO_Stub *stub, - TAO_ORB_Core *orb_core, - TAO_OutputCDR &stream, - TAO_Message_Semantics message_semantics, - ACE_Time_Value *max_wait_time) +TAO_UIPMC_Transport::send_request ( + TAO_Stub *stub, + TAO_ORB_Core *orb_core, + TAO_OutputCDR &stream, + TAO_Message_Semantics message_semantics, + ACE_Time_Value *max_wait_time) { if (this->ws_->sending_request (orb_core, - message_semantics) == -1) - return -1; - - if (this->send_message (stream, + message_semantics) == -1 || + this->send_message (stream, stub, 0, message_semantics, max_wait_time) == -1) + { + return -1; + } - return -1; - + this->first_request_sent(); return 0; } -template<typename CONNECTION_HANDLER> int -TAO_UIPMC_Transport<CONNECTION_HANDLER>::send_message (TAO_OutputCDR &stream, - TAO_Stub *stub, - TAO_ServerRequest *request, - TAO_Message_Semantics message_semantics, - ACE_Time_Value *max_wait_time) +TAO_UIPMC_Transport::send_message ( + TAO_OutputCDR &stream, + TAO_Stub *stub, + TAO_ServerRequest *request, + TAO_Message_Semantics message_semantics, + ACE_Time_Value *max_wait_time) { // Format the message in the stream first if (this->messaging_object ()->format_message (stream, stub, request) != 0) @@ -579,21 +474,19 @@ TAO_UIPMC_Transport<CONNECTION_HANDLER>::send_message (TAO_OutputCDR &stream, // versions seem to need it though. Leaving it costs little. // This guarantees to send all data (bytes) or return an error. - ssize_t n = this->send_message_shared (stub, - message_semantics, - stream.begin (), - max_wait_time); + ssize_t const n = this->send_message_shared (stub, + message_semantics, + stream.begin (), + max_wait_time); if (n == -1) { if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %m\n"), - this->id (), - ACE_TEXT ("send_message ()\n"))); - } - + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO: (%P|%t) - UIPMC_Transport[%d]::") + ACE_TEXT ("send_message, closing transport %d after ") + ACE_TEXT ("fault '%m'\n"), + this->id ())); return -1; } @@ -601,5 +494,3 @@ TAO_UIPMC_Transport<CONNECTION_HANDLER>::send_message (TAO_OutputCDR &stream, } TAO_END_VERSIONED_NAMESPACE_DECL - -#endif /* TAO_UIPMC_TRANSPORT_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.h index 1a2e3b4cc69..e519e886140 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.h @@ -24,35 +24,30 @@ #include "ace/SOCK_Stream.h" #include "ace/Svc_Handler.h" +#include "ace/Refcountable_T.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL // Forward decls. class TAO_ORB_Core; -class TAO_Operation_Details; -class TAO_Acceptor; +class TAO_UIPMC_Connection_Handler; /** * @class TAO_UIPMC_Transport * * @brief Specialization of the base TAO_Transport class to handle the - * MIOP protocol. + * client side MIOP protocol. */ -template<typename CONNECTION_HANDLER> -class TAO_UIPMC_Transport : public TAO_Transport +class TAO_PortableGroup_Export TAO_UIPMC_Transport : public TAO_Transport { public: - /// Constructor. - TAO_UIPMC_Transport (CONNECTION_HANDLER *handler, + TAO_UIPMC_Transport (TAO_UIPMC_Connection_Handler *handler, TAO_ORB_Core *orb_core); /// Default destructor. ~TAO_UIPMC_Transport (void); - /// Look for the documentation in Transport.h. - virtual int handle_input (TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time = 0); protected: /** @name Overridden Template Methods * @@ -60,7 +55,7 @@ protected: */ //@{ - virtual ACE_Event_Handler * event_handler_i (void); + virtual ACE_Event_Handler *event_handler_i (void); virtual TAO_Connection_Handler *connection_handler_i (void); /// Write the complete Message_Block chain to the connection. @@ -68,11 +63,10 @@ protected: size_t &bytes_transferred, const ACE_Time_Value *max_wait_time); - - /// Read len bytes from into buf. + /// Shouldn't ever be called on the client side (read len bytes into buf). virtual ssize_t recv (char *buf, size_t len, - const ACE_Time_Value *s = 0); + ACE_Time_Value const *s = 0); virtual int register_handler (void); @@ -95,24 +89,29 @@ public: private: /// Construct and write a unique ID to the MIOP header. - void write_unique_id (TAO_OutputCDR &miop_hdr, unsigned long unique); + bool write_unique_id (TAO_OutputCDR &miop_hdr) const; -private: + /// Throttle back clients send rate so as to not exceed client/server buffers + /// and servant message processing time. + void throttle_send_rate ( + ACE_UINT64 max_fragment_rate, + u_long max_fragment_size, + u_long this_send_size); /// The connection service handler used for accessing lower layer /// communication protocols. - CONNECTION_HANDLER *connection_handler_; -}; + TAO_UIPMC_Connection_Handler *connection_handler_; -TAO_END_VERSIONED_NAMESPACE_DECL + /// This UUID used by client for making unique MIOP IDs. + u_long uuid_hash_; -#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) -#include "orbsvcs/PortableGroup/UIPMC_Transport.cpp" -#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + /// On the client side the total amount of outstanding data being + /// transmitted and the time when this was last updated. + u_long total_bytes_outstanding_; + ACE_Time_Value time_last_sent_; +}; -#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) -#pragma implementation ("orbsvcs/PortableGroup/UIPMC_Transport.cpp") -#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ +TAO_END_VERSIONED_NAMESPACE_DECL #include /**/ "ace/post.h" #endif /* TAO_UIPMC_TRANSPORT_H */ diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.cpp new file mode 100644 index 00000000000..c9547808179 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.cpp @@ -0,0 +1,130 @@ +// $Id$ + +#include "orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.h" +#include "orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h" + +#include "ace/OS_NS_sys_time.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO_PG +{ + + UIPMC_Recv_Packet_Cleanup_Guard::UIPMC_Recv_Packet_Cleanup_Guard ( + TAO_UIPMC_Mcast_Transport *transport + ) + : transport_ (transport) + { + } + + UIPMC_Recv_Packet_Cleanup_Guard::~UIPMC_Recv_Packet_Cleanup_Guard (void) + { + // Cleanup only expired packets. + this->transport_->cleanup_packets (true); + } + + UIPMC_Recv_Packet::UIPMC_Recv_Packet (void) + : last_fragment_id_ (0) + , data_length_ (0) + , started_ (ACE_OS::gettimeofday ()) + { + } + + UIPMC_Recv_Packet::~UIPMC_Recv_Packet (void) + { + for (Fragments_Map::iterator iter = this->fragments_.begin (); + iter != this->fragments_.end (); + ++iter) + { + delete [] (*iter).item ().buf; + } + } + + int + UIPMC_Recv_Packet::add_fragment (char *data, + CORBA::UShort len, + CORBA::ULong id, + bool is_last) + { + Fragment new_data; + ACE_NEW_RETURN (new_data.buf, + char[len], + -1); + ACE_OS::memcpy (new_data.buf, data, len); + new_data.len = len; + + if (is_last) + this->last_fragment_id_ = id; + + this->data_length_ += len; + + if (TAO_debug_level >= 10) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO_PG::UIPMC_Recv_Packet::") + ACE_TEXT ("add_fragment, adding fragment %d with %d out ") + ACE_TEXT ("of %d bytes\n"), + id, + len, + this->data_length_)); + + if (this->fragments_.bind (id, new_data) != 0) + { + // We've failed to add a new fragment. It's an error no matter + // what was the reason. Mark the packet as expired. + this->started_ = ACE_Time_Value::zero; + delete [] new_data.buf; + return -1; + } + + // We haven't encountered yet the last fragment. + if (!is_last && this->last_fragment_id_ == 0) + return 0; + + // We haven't encountered yet all the fragments but the last one is + // already in. + if (this->last_fragment_id_ + 1 != this->fragments_.current_size ()) + return 0; + + // Since fragments are enumerated from 0 to last_fragment_id_ this + // is the heaviest but the most reliable check for packet completeness. + for (CORBA::ULong id = 0; id <= this->last_fragment_id_; ++id) + { + if (this->fragments_.find (id) == -1) + { + // Mark the packet as if it timedout. + this->started_ = ACE_Time_Value::zero; + return 0; + } + } + + return 1; + } + + ACE_Time_Value const & + UIPMC_Recv_Packet::started (void) const + { + return this->started_; + } + + CORBA::ULong + UIPMC_Recv_Packet::data_length (void) const + { + return this->data_length_; + } + + void + UIPMC_Recv_Packet::copy_data (char *buf) const + { + for (CORBA::ULong id = 0; id <= this->last_fragment_id_; ++id) + { + Fragment f = { 0, 0 }; + this->fragments_.find (id, f); + + ACE_OS::memcpy (buf, f.buf, f.len); + buf += f.len; + } + } + +} // namespace TAO_PG + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.h b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.h new file mode 100644 index 00000000000..c117c8bf57e --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.h @@ -0,0 +1,111 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file UIPMC_Transport_Recv_Packet.h + * + * $Id$ + * + * @author Vladimir Zykov <vz@prismtech.com> + */ +//============================================================================= + +#ifndef TAO_UIPMC_TRANSPORT_RECV_PACKET_H +#define TAO_UIPMC_TRANSPORT_RECV_PACKET_H + +#include /**/ "ace/pre.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/orbconf.h" +#include "tao/Versioned_Namespace.h" + +#include "tao/corba.h" + +class TAO_UIPMC_Mcast_Transport; + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO_PG +{ + + /** + * @class UIPMC_Recv_Packet_Cleanup_Guard + * + * @brief A guard that will cleanup broken/expired packets. + */ + class UIPMC_Recv_Packet_Cleanup_Guard + { + public: + UIPMC_Recv_Packet_Cleanup_Guard (TAO_UIPMC_Mcast_Transport *transport); + + ~UIPMC_Recv_Packet_Cleanup_Guard (void); + + private: + TAO_UIPMC_Mcast_Transport *transport_; + }; + + /** + * @class UIPMC_Recv_Packet + * + * @brief A MIOP packet for receiving. + */ + class UIPMC_Recv_Packet + { + public: + /// Constructs a new recv packet. + UIPMC_Recv_Packet (void); + + ~UIPMC_Recv_Packet (void); + + /// Adds a new fragment to the packet and if it fails marks the packet + /// as broken. + /// Returns 1 if all fragments are in AND if there are no fragments + /// with unexpected IDs. In case unexpected IDs are encountered the + /// packet is marked as broken. + int add_fragment (char *data, CORBA::UShort len, + CORBA::ULong id, bool is_last); + + /// Returns the time when the first fragment was received or + /// ACE_Time_Value::zero if the whole packet was not able to + /// reconstruct for some reason. + ACE_Time_Value const &started (void) const; + + CORBA::ULong data_length (void) const; + + /// Copies fragments to buf. Caller ensures that the buf is big enough + /// for all fragments. + void copy_data (char *buf) const; + + private: + /// The id of the last fragment. + CORBA::ULong last_fragment_id_; + + /// The length of the data stored in all fragments. + CORBA::ULong data_length_; + + /// The time when the packet will expire. + mutable ACE_Time_Value started_; + + /// Fragments. + struct Fragment + { + char *buf; + CORBA::UShort len; + }; + + typedef ACE_Hash_Map_Manager<CORBA::ULong, + Fragment, + ACE_SYNCH_NULL_MUTEX> Fragments_Map; + Fragments_Map fragments_; + }; + +} // namespace TAO_PG + +TAO_END_VERSIONED_NAMESPACE_DECL + +#include /**/ "ace/post.h" + +#endif // TAO_UIPMC_TRANSPORT_RECV_PACKET_H diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp new file mode 100644 index 00000000000..f61ff35a2f3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.cpp @@ -0,0 +1,356 @@ +// $Id$ + +#include "orbsvcs/PortableGroup/miop_resource.h" +#include "orbsvcs/PortableGroup/miopconf.h" +#include "orbsvcs/PortableGroup/Fragments_Cleanup_Strategy.h" + +#include "tao/debug.h" + +#include "ace/OS_NS_strings.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO_MIOP_Resource_Factory::TAO_MIOP_Resource_Factory (void) + : fragments_cleanup_strategy_type_ (TAO_MIOP_CLEANUP_TIME_BOUND) + , fragments_cleanup_bound_ (-1) + , fragments_cleanup_strategy_ (0) + , max_fragments_ (TAO_DEFAULT_MIOP_MAX_FRAGMENTS) + , max_fragment_size_ (TAO_DEFAULT_MIOP_FRAGMENT_SIZE) + , max_fragment_rate_ (0u) // Zero uses max_fragment_size_ instead. + , send_hi_water_mark_ (0u) // Zero sets this to actual -ORBSndSock + , send_buffer_size_ (0u) // Zero is unspecified (-ORBSndSock). + , receive_buffer_size_ (0u) // Zero is unspecified (-ORBRcvSock). +{ +} + +TAO_MIOP_Resource_Factory::~TAO_MIOP_Resource_Factory (void) +{ + delete this->fragments_cleanup_strategy_; +} + +int +TAO_MIOP_Resource_Factory::init (int argc, ACE_TCHAR *argv[]) +{ + ACE_TRACE ("TAO_MIOP_Resource_Factory::init"); + + for (int curarg = 0; curarg < argc; ++curarg) + { + if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBFragmentsCleanupStrategy")) == 0) + { + if (++curarg < argc) + { + ACE_TCHAR* name = argv[curarg]; + + if (ACE_OS::strcasecmp (name, + ACE_TEXT ("delay")) == 0) + { + this->fragments_cleanup_strategy_type_ = + TAO_MIOP_CLEANUP_TIME_BOUND; + } + else if (ACE_OS::strcasecmp (name, + ACE_TEXT ("number")) == 0) + { + this->fragments_cleanup_strategy_type_ = + TAO_MIOP_CLEANUP_NUMBER_BOUND; + } + else if (ACE_OS::strcasecmp (name, + ACE_TEXT ("memory")) == 0) + { + this->fragments_cleanup_strategy_type_ = + TAO_MIOP_CLEANUP_MEMORY_BOUND; + } + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBFragmentsCleanupStrategy %s is unknown.\n"), + name)); + } + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBFragmentsCleanupStrategy missing type.\n"))); + } + else if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBFragmentsCleanupBound")) == 0) + { + if (++curarg < argc) + this->fragments_cleanup_bound_ = ACE_OS::atoi (argv[curarg]); + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBFragmentsCleanupBound missing value.\n"))); + } + else if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBMaxFragments")) == 0) + { + if (++curarg < argc) + { + int const max= ACE_OS::atoi (argv[curarg]); + if (max < 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBMaxFragments %d ") + ACE_TEXT ("is invalid (using %u).\n"), + max, + TAO_DEFAULT_MIOP_MAX_FRAGMENTS)); + this->max_fragments_ = TAO_DEFAULT_MIOP_MAX_FRAGMENTS; + } + else + this->max_fragments_ = static_cast<u_long> (max); + } + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBMaxFragments missing limit.\n"))); + } + else if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBMaxFragmentSize")) == 0) + { + if (++curarg < argc) + { + int const size= ACE_OS::atoi (argv[curarg]); + if (size < static_cast<int> (MIOP_MAX_HEADER_SIZE) || + size > static_cast<int> (MIOP_MAX_DGRAM_SIZE) ) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBMaxFragmentSize %d is not within ") + ACE_TEXT ("range %u to %u (using %u).\n"), + size, + MIOP_MAX_HEADER_SIZE, + MIOP_MAX_DGRAM_SIZE, + TAO_DEFAULT_MIOP_FRAGMENT_SIZE)); + this->max_fragment_size_ = TAO_DEFAULT_MIOP_FRAGMENT_SIZE; + } + else + this->max_fragment_size_ = static_cast<u_long> (size); + } + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBMaxFragmentSize missing limit.\n"))); + } + else if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBMaxFragmentRate")) == 0) + { + if (++curarg < argc) + { + int const tx_time= ACE_OS::atoi (argv[curarg]); + if (tx_time <= 0) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBMaxFragmentRate %d is invalid ") + ACE_TEXT ("(using -ORBMaxFragmentSize in micro seconds).\n"), + tx_time)); + this->max_fragment_rate_= 0u; // Zero uses configured max_fragment_size_ + } + else + this->max_fragment_rate_= static_cast<u_long> (tx_time); + } + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBMaxFragmentRate missing micro-seconds.\n"))); + } + else if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBSendHighWaterMark")) == 0 || + ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBSendHiWaterMark")) == 0 || + ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBSendHWM")) == 0 || + ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBSndHighWaterMark")) == 0 || + ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBSndHiWaterMark")) == 0 || + ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBSndHWM")) == 0) + { + if (++curarg < argc) + { + int const hwm= ACE_OS::atoi (argv[curarg]); + if (hwm <= 0) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("%s %d is invalid ") + ACE_TEXT ("(using actual -ORBSndSock size).\n"), + argv[curarg-1], + hwm, + TAO_DEFAULT_MIOP_MAX_FRAGMENTS)); + this->send_hi_water_mark_ = 0u; // Zero sets this to actual -ORBSndSock + } + else + this->send_hi_water_mark_ = static_cast<u_long> (hwm); + } + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("%s missing limit.\n"), + argv[curarg-1])); + } + else if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBSndSock")) == 0) + { + if (++curarg < argc) + { + int const bytes= ACE_OS::atoi (argv[curarg]); + if (bytes <= 0) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBSndSock %d is invalid.\n"), + bytes)); + this->send_buffer_size_= 0u; // Zero is unspecified + } + else + this->send_buffer_size_= static_cast<u_long> (bytes); + } + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBSndSock missing size in bytes.\n"))); + } + else if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT ("-ORBRcvSock")) == 0) + { + if (++curarg < argc) + { + int const bytes= ACE_OS::atoi (argv[curarg]); + if (bytes <= 0) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBRcvSock %d is invalid.\n"), + bytes)); + this->receive_buffer_size_= 0u; // Zero is unspecified + } + else + this->receive_buffer_size_= static_cast<u_long> (bytes); + } + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory ") + ACE_TEXT ("-ORBRcvSock missing size in bytes.\n"))); + } + else if (ACE_OS::strncmp (argv[curarg], ACE_TEXT ("-ORB"), 4) == 0) + { + // Can we assume there is an argument after the option? + // ++curarg; + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory - ") + ACE_TEXT ("unknown option <%s>.\n"), + argv[curarg])); + } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - MIOP_Resource_Factory - ") + ACE_TEXT ("ignoring option <%s>.\n"), + argv[curarg])); + } + } + + return 0; +} + +TAO_PG::Fragments_Cleanup_Strategy * +TAO_MIOP_Resource_Factory::fragments_cleanup_strategy (void) const +{ + if (this->fragments_cleanup_strategy_ == 0) + { + if (this->fragments_cleanup_strategy_type_ == + TAO_MIOP_CLEANUP_TIME_BOUND) + { + int bound = this->fragments_cleanup_bound_; + if (bound == -1) + bound = TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_DELAY; + + ACE_NEW_RETURN (this->fragments_cleanup_strategy_, + TAO_PG::Time_Bound_Fragments_Cleanup_Strategy ( + bound), + 0); + } + else if (this->fragments_cleanup_strategy_type_ == + TAO_MIOP_CLEANUP_NUMBER_BOUND) + { + int bound = this->fragments_cleanup_bound_; + if (bound == -1) + bound = TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_NUMBER; + + ACE_NEW_RETURN (this->fragments_cleanup_strategy_, + TAO_PG::Number_Bound_Fragments_Cleanup_Strategy ( + bound), + 0); + } + else if (this->fragments_cleanup_strategy_type_ == + TAO_MIOP_CLEANUP_MEMORY_BOUND) + { + int bound = this->fragments_cleanup_bound_; + if (bound == -1) + bound = TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_MEMORY; + + ACE_NEW_RETURN (this->fragments_cleanup_strategy_, + TAO_PG::Memory_Bound_Fragments_Cleanup_Strategy ( + bound), + 0); + } + } + + return this->fragments_cleanup_strategy_; +} + +u_long +TAO_MIOP_Resource_Factory::max_fragment_size (void) const +{ + return this->max_fragment_size_; +} + +u_long +TAO_MIOP_Resource_Factory::max_fragments (void) const +{ + return this->max_fragments_; +} + +u_long +TAO_MIOP_Resource_Factory::max_fragment_rate (void) const +{ + // If "max_fragment_rate_" is not specified (i.e. zero) + // use the same value as "max_fragment_size_". + return this->max_fragment_rate_ ? + this->max_fragment_rate_ : + this->max_fragment_size_ ; +} + +u_long +TAO_MIOP_Resource_Factory::send_hi_water_mark (void) const +{ + return this->send_hi_water_mark_; +} + +u_long +TAO_MIOP_Resource_Factory::send_buffer_size (void) const +{ + return send_buffer_size_; +} + +u_long +TAO_MIOP_Resource_Factory::receive_buffer_size (void) const +{ + return receive_buffer_size_; +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +// **************************************************************** + +ACE_STATIC_SVC_DEFINE (TAO_MIOP_Resource_Factory, + ACE_TEXT ("MIOP_Resource_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_MIOP_Resource_Factory), + ACE_Service_Type::DELETE_THIS + | ACE_Service_Type::DELETE_OBJ, + 0) +ACE_FACTORY_DEFINE (TAO_PortableGroup, TAO_MIOP_Resource_Factory) diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h new file mode 100644 index 00000000000..a4cfac1b51d --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miop_resource.h @@ -0,0 +1,133 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file miop_resource.h + * + * $Id$ + * + * @author Vladimir Zykov <vz@prismtech.com> + */ +//============================================================================= + + +#ifndef TAO_MIOP_RESOURCE_H +#define TAO_MIOP_RESOURCE_H + +#include /**/ "ace/pre.h" + +#include "orbsvcs/PortableGroup/portablegroup_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/orbconf.h" +#include "tao/Versioned_Namespace.h" + +#include "ace/Service_Object.h" +#include "ace/Service_Config.h" +#include "ace/Time_Value.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO_PG +{ + class Fragments_Cleanup_Strategy; +} + +/** + * @class TAO_MIOP_Resource_Factory + * + * @brief TAO's MIOP resource factory + * + * Using a <{resource source specifier}> as a discriminator, the + * factory can return resource instances which are, e.g., global, + * stored in thread-specific storage, stored in shared memory, + * etc. + */ +class TAO_PortableGroup_Export TAO_MIOP_Resource_Factory + : public ACE_Service_Object +{ +public: + + /// Constructor. + TAO_MIOP_Resource_Factory (void); + + /// Destructor. + virtual ~TAO_MIOP_Resource_Factory (void); + + /** + * @name Service Configurator Hooks + */ + //@{ + /// Dynamic linking hook + virtual int init (int argc, ACE_TCHAR *argv[]); + //@} + + /** + * @name Member Accessors + */ + //@{ + TAO_PG::Fragments_Cleanup_Strategy *fragments_cleanup_strategy (void) const; + + /// Get Maximum number of fragments allowed. + u_long max_fragments (void) const; + + /// Get MTU value (roughly). + u_long max_fragment_size (void) const; + + /// Get time required for transfering one maximum sized fragment. + u_long max_fragment_rate (void) const; + + /// Get number of bytes that can be sent without delay. + u_long send_hi_water_mark (void) const; + + /// Get the desired socket transmit buffer's size in bytes (Zero is unspecified). + u_long send_buffer_size (void) const; + + /// Get the desired socket receive buffer's size in bytes (Zero is unspecified). + u_long receive_buffer_size (void) const; + //@} + +private: + + enum Fragments_Cleanup_Strategy_Type + { + TAO_MIOP_CLEANUP_TIME_BOUND, + TAO_MIOP_CLEANUP_NUMBER_BOUND, + TAO_MIOP_CLEANUP_MEMORY_BOUND + }; + + Fragments_Cleanup_Strategy_Type fragments_cleanup_strategy_type_; + + int fragments_cleanup_bound_; + + mutable TAO_PG::Fragments_Cleanup_Strategy *fragments_cleanup_strategy_; + + /// Maximum number of fragments. + u_long max_fragments_; + + /// Maximum Size of a single fragment. + u_long max_fragment_size_; + + /// Time required for transfering one maximum sized fragment. + u_long max_fragment_rate_; + + /// Number of bytes that can be sent without delay. + u_long send_hi_water_mark_; + + /// Get the desired socket send buffer's size in bytes. + u_long send_buffer_size_; + + /// Get the desired socket receive buffer's size in bytes. + u_long receive_buffer_size_; +}; + +TAO_END_VERSIONED_NAMESPACE_DECL + +ACE_STATIC_SVC_DECLARE (TAO_MIOP_Resource_Factory) +ACE_FACTORY_DECLARE (TAO_PortableGroup, TAO_MIOP_Resource_Factory) + +#include /**/ "ace/post.h" +#endif /* TAO_MIOP_RESOURCE_H */ diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h b/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h index 44db690f7b2..b45947b340a 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/miopconf.h @@ -24,6 +24,8 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "tao/corba.h" + // This is the version of the MIOP spec that TAO supports. The // exact usage of the version has not been emphasized. But TAO should // get a TaggedComponents for a group with this version number. So, for @@ -37,5 +39,68 @@ #define TAO_DEF_MIOP_MINOR 0 #endif /* TAO_DEF_MIOP_MINOR */ +// MIOP Definitions: + +// Default value for the delay in milliseconds before a fragmented +// MIOP message will be considered as expired on the server. +#if !defined (TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_DELAY) +static int const TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_DELAY = 1000; +#endif + +// Default number of incomplete messages waiting for reassembly before +// cleanup on the server side. +#if !defined (TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_NUMBER) +static int const TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_NUMBER = 5; +#endif + +// Default memory taken by incomplete messages waiting for reassembly +// before cleanup on the server side. +#if !defined (TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_MEMORY) +static int const TAO_DEFAULT_MIOP_FRAGMENTS_CLEANUP_MEMORY = 3000000; +#endif + +// We don't use the constants below but let they stay for the documentation. +// static u_long const MIOP_MAGIC_OFFSET = 0u; +// static u_long const MIOP_VERSION_OFFSET = 4u; +// static u_long const MIOP_FLAGS_OFFSET = 5u; +// static u_long const MIOP_PACKET_LENGTH_OFFSET = 6u; +// static u_long const MIOP_PACKET_NUMBER_OFFSET = 8u; +// static u_long const MIOP_NUMBER_OF_PACKETS_OFFSET = 12u; +// static u_long const MIOP_ID_LENGTH_OFFSET = 16u; +static u_long const MIOP_ID_CONTENT_OFFSET = 20u; + +// Max ID length 252 is defined in the MIOP spec. We use 12. +static u_long const MIOP_MIN_ID_LENGTH = 0u; +static u_long const MIOP_MAX_ID_LENGTH = 252u; +static u_long const MIOP_DEFAULT_ID_LENGTH = 12u; + +// MIOP_DEFAULT_HEADER_SIZE, MIOP_MIN_HEADER_SIZE and MIOP_MAX_HEADER_SIZE +// must be multiple of 8. +static u_long const MIOP_DEFAULT_HEADER_SIZE = + MIOP_ID_CONTENT_OFFSET + MIOP_DEFAULT_ID_LENGTH + + (MIOP_ID_CONTENT_OFFSET + MIOP_DEFAULT_ID_LENGTH) % 8u; +static u_long const MIOP_MIN_HEADER_SIZE = + MIOP_ID_CONTENT_OFFSET + MIOP_MIN_ID_LENGTH + + (MIOP_ID_CONTENT_OFFSET + MIOP_MIN_ID_LENGTH) % 8u; +static u_long const MIOP_MAX_HEADER_SIZE = + MIOP_ID_CONTENT_OFFSET + MIOP_MAX_ID_LENGTH + + (MIOP_ID_CONTENT_OFFSET + MIOP_MAX_ID_LENGTH) % 8u; + +static u_long const MIOP_MAX_DGRAM_SIZE = ACE_MAX_UDP_PACKET_SIZE; + +// Default value for the size of MIOP fragment used by the client. +// This can be considered same as MTU. +#if !defined (TAO_DEFAULT_MIOP_FRAGMENT_SIZE) +static u_long const TAO_DEFAULT_MIOP_FRAGMENT_SIZE = MIOP_MAX_DGRAM_SIZE; +#endif + +#if !defined (TAO_DEFAULT_MIOP_MAX_FRAGMENTS) +static u_long const TAO_DEFAULT_MIOP_MAX_FRAGMENTS = 0u; // Zero is unlimited +#endif + +static CORBA::Octet const miop_magic[4] = { + 0x4d, 0x49, 0x4f, 0x50 +}; // in ASCII this is 'M', 'I', 'O', 'P' + #include /**/ "ace/post.h" #endif /*TAO_MIOPCONF_H*/ diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello.idl b/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello.idl new file mode 100644 index 00000000000..d7776aa5691 --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello.idl @@ -0,0 +1,18 @@ +// $Id$ + +module Test { + typedef sequence<octet> Octets; + + const string ClientIDs = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + + interface UIPMC_Object { + oneway void process (in Octets payload); + }; + + interface Hello { + UIPMC_Object get_object (); + + oneway void shutdown (); + }; +}; diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello_Impl.cpp b/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello_Impl.cpp new file mode 100644 index 00000000000..906ffd17ccc --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello_Impl.cpp @@ -0,0 +1,112 @@ +// +// $Id$ +// + +#include "Hello_Impl.h" + +UIPMC_Object_Impl::UIPMC_Object_Impl (CORBA::ULong payload, + CORBA::ULong clients, + CORBA::ULong calls) + : payload_ (payload) + , clients_ (clients) + , calls_ (calls) +{ +} + +UIPMC_Object_Impl::~UIPMC_Object_Impl (void) +{ + if (this->received_.current_size () == 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("ERROR: expected %d clients but only %d encountered\n"), + this->clients_, this->received_.current_size ())); + return; + } + + for (CORBA::ULong i = 0; i < this->clients_; ++i) + { + CORBA::ULong count = 0; + this->received_.find (Test::ClientIDs[i], count); + + if (count != this->calls_) + // This perfectly ok for MIOP to lose messages. + // So, this is not an error. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("DEBUG: expected %d messages from '%c' client ") + ACE_TEXT ("but only %d encountered\n"), + this->calls_, Test::ClientIDs[i], count)); + } +} + +void +UIPMC_Object_Impl::process (Test::Octets const &payload) +{ + if (this->payload_ != payload.length ()) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("ERROR: expected %d but received %d ") + ACE_TEXT ("sequence length\n"), + this->payload_, payload.length ())); + return; + } + + CORBA::ULong count = 0; + CORBA::Octet c = payload[0]; + this->received_.find (c, count); + + Test::Octets seq (this->payload_); + seq.length (this->payload_); + CORBA::Octet *buff = seq.get_buffer (); + ACE_OS::memset (buff, c, this->payload_); + char *one = static_cast<char *> (static_cast<void *> (buff)); + char const *two = static_cast<char const *> ( + static_cast<void const *> (payload.get_buffer ())); + if (ACE_OS::strncmp (one, two, this->payload_) != 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("ERROR: received malformed message from client '%c'\n"), + c)); + return; + } + + if (ACE_OS::strchr (Test::ClientIDs, c) == 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("ERROR: client id '%c' doesn't match any known value\n"), + c)); + return; + } + + ++count; + if (this->received_.rebind (c, count) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("ERROR: cann't rebind received count\n"))); + } +} + +Hello_Impl::Hello_Impl (CORBA::ORB_ptr orb, + Test::UIPMC_Object_ptr obj) + : orb_ (CORBA::ORB::_duplicate (orb)) + , obj_ (Test::UIPMC_Object::_duplicate (obj)) +{ +} + +Test::UIPMC_Object_ptr +Hello_Impl::get_object (void) +{ + return Test::UIPMC_Object::_duplicate (this->obj_.in ()); +} + +void +Hello_Impl::shutdown (void) +{ + try + { + this->orb_->shutdown (0); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught in shutdown():"); + } +} diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello_Impl.h b/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello_Impl.h new file mode 100644 index 00000000000..1b60a05f84c --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/Hello_Impl.h @@ -0,0 +1,52 @@ +// +// $Id$ +// + +#ifndef _HELLOS_IMPL_H_ +#define _HELLOS_IMPL_H_ + +#include "ace/Hash_Map_Manager_T.h" +#include "HelloS.h" + +class UIPMC_Object_Impl : public virtual POA_Test::UIPMC_Object +{ +public: + UIPMC_Object_Impl (CORBA::ULong payload, CORBA::ULong clients, CORBA::ULong calls); + + ~UIPMC_Object_Impl (void); + + // The skeleton methods + virtual void process (Test::Octets const &payload); + +private: + CORBA::ULong payload_; + + CORBA::ULong clients_; + + CORBA::ULong calls_; + + typedef ACE_Hash_Map_Manager<CORBA::Octet, + CORBA::ULong, + TAO_SYNCH_MUTEX> Client_Count_Map; + Client_Count_Map received_; +}; + + +class Hello_Impl : public virtual POA_Test::Hello +{ +public: + // Constructor + Hello_Impl (CORBA::ORB_ptr orb, Test::UIPMC_Object_ptr obj); + + // The skeleton methods + virtual Test::UIPMC_Object_ptr get_object (void); + + virtual void shutdown (void); + +private: + CORBA::ORB_var orb_; + + Test::UIPMC_Object_var obj_; +}; + +#endif // _HELLOS_IMPL_H_ diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/McastFragmentation.mpc b/TAO/orbsvcs/tests/Miop/McastFragmentation/McastFragmentation.mpc new file mode 100644 index 00000000000..58ffc92e8ef --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/McastFragmentation.mpc @@ -0,0 +1,36 @@ +// -*- MPC -*- +// $Id$ +// + +project(*IDL): taoidldefaults { + IDL_Files { + Hello.idl + } + custom_only = 1 +} + +project(*Server) : taoserver, portablegroup { + exename = server + after += *IDL + + Source_Files { + Hello_Impl.cpp + HelloC.cpp + HelloS.cpp + server.cpp + } + IDL_Files { + } +} + +project(*Client) : taoclient, portablegroup { + exename = client + after += *IDL + + Source_Files { + HelloC.cpp + client.cpp + } + IDL_Files { + } +} diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/README b/TAO/orbsvcs/tests/Miop/McastFragmentation/README new file mode 100644 index 00000000000..f4700129bbf --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/README @@ -0,0 +1,20 @@ +// $Id$ + +(Note this test came originally from Prismtech's tao764_regression which +was tied into tao764, tao836, tao840 and tao870 tickets.) + +Regression test for MIOP packet reassembly if a GIOP message is too big. + +Expected successful output is below. + +$> ./run_test.pl +MIOP object is <IOR:010000000100000000000000010000000300000040000000010100000d0000003232352e31302e31302e31300000bf4e01000000270000001c0000000101000007000000646f6d61696e0000010000000000000000000000> +Activated as <IOR:010000001300000049444c3a546573742f48656c6c6f3a312e3000000200000000000000800000000101020020000000777330332e6865616471756172746572732e65636c6970736573702e636f6d00c6ba00001b00000014010f005253545362344df51803000100000001000000020000000002000000000000000800000001000000004f41540100000018000000010000000100010001000000010001050901010000000000000000006400000001010200040000003a3a3100c6ba00001b00000014010f005253545362344df51803000100000001000000020000000002000000000000000800000001000000004f41540100000018000000010000000100010001000000010001050901010000000000> +args to be used: -k 'file://server.ior' -p 100000 -t 1 -f 0 -c 10 -s 1000 + +Client finished successfully +args to be used: -k 'file://server.ior' -p 1000 -t 5 -f 0 -c 100 -s 100 -x + +Client finished successfully + +Server finished successfully. diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/client.cpp b/TAO/orbsvcs/tests/Miop/McastFragmentation/client.cpp new file mode 100644 index 00000000000..e55c8874557 --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/client.cpp @@ -0,0 +1,194 @@ +// +// $Id$ +// + +#include "ace/Task.h" +#include "ace/Atomic_Op_T.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_unistd.h" +#include "HelloC.h" + +ACE_TCHAR const *ior = ACE_TEXT ("file://test.ior"); +CORBA::ULong payload_length = 1000; +CORBA::ULong client_threads = 5; +CORBA::ULong id_offset = 0; +CORBA::ULong payload_calls = 100; +CORBA::ULong sleep_millis = 100; +bool do_shutdown = false; + +int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, ACE_TEXT ("k:p:t:f:c:s:x")); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'k': + ior = get_opts.opt_arg (); + break; + + case 'p': + payload_length = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case 't': + client_threads = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case 'f': + id_offset = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case 'c': + payload_calls = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case 's': + sleep_millis = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case 'x': + do_shutdown = true; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("usage: %s ") + ACE_TEXT ("-k <ior> ") + ACE_TEXT ("-p <payload_length> ") + ACE_TEXT ("-t <client_threads> ") + ACE_TEXT ("-f <id_offset> ") + ACE_TEXT ("-c <payload_calls> ") + ACE_TEXT ("-s <sleep_millis> ") + ACE_TEXT ("-x") + ACE_TEXT ("\n"), + argv [0]), + -1); + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("args to be used: -k '%s' -p %d -t %d -f %d -c %d -s %d%s\n"), + ior, + payload_length, + client_threads, + id_offset, + payload_calls, + sleep_millis, + do_shutdown ? " -x" : "")); + + // Indicates sucessful parsing of the command line + return 0; +} + +class ClientThread : public ACE_Task_Base +{ +public: + ClientThread (Test::UIPMC_Object_ptr obj, CORBA::ULong payload, + CORBA::ULong offset, CORBA::ULong calls, + CORBA::ULong sleep) + : obj_ (Test::UIPMC_Object::_duplicate (obj)) + , payload_ (payload) + , calls_ (calls) + , id_ (offset) + , sleep_ (sleep) + { + } + + virtual int svc (void) + { + try + { + CORBA::ULong i = this->id_++; + + Test::Octets seq (this->payload_); + seq.length (this->payload_); + + CORBA::Octet *buff = seq.get_buffer (); + ACE_OS::memset (buff, Test::ClientIDs[i], this->payload_); + + for (CORBA::ULong j = 0; j < this->calls_; ++j) + { + this->obj_->process (seq); + } + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught in ClientThread:"); + return -1; + } + + return 0; + } + +private: + Test::UIPMC_Object_var obj_; + + CORBA::ULong payload_; + CORBA::ULong calls_; + + ACE_Atomic_Op <TAO_SYNCH_MUTEX, CORBA::ULong> id_; + + CORBA::ULong sleep_; +}; + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + CORBA::ORB_var orb = CORBA::ORB_init (argc, argv); + + if (parse_args (argc, argv) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("ERROR: wrong arguments\n")), + -1); + + if (id_offset + client_threads >= ACE_OS::strlen (Test::ClientIDs)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("ERROR: too many clients\n")), + -1); + + CORBA::Object_var obj = orb->string_to_object (ior); + + // Create Hello reference. + Test::Hello_var hello = Test::Hello::_narrow (obj.in ()); + + if (CORBA::is_nil (hello.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("ERROR: nil Hello object\n")), + -1); + + if (do_shutdown) + hello->shutdown (); + else + { + Test::UIPMC_Object_var uipmc_obj = hello->get_object (); + + { + // start clients + ClientThread cln_thr (uipmc_obj.in (), payload_length, + id_offset, payload_calls, sleep_millis); + cln_thr.activate (THR_NEW_LWP | THR_JOINABLE, client_threads); + cln_thr.wait (); + } + + // Give a chance to flush all OS buffers for client. + while (orb->work_pending ()) + orb->perform_work (); + } + + orb->destroy (); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught in client main ():"); + return -1; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\nClient finished successfully\n"))); + return 0; +} diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/run_test.pl b/TAO/orbsvcs/tests/Miop/McastFragmentation/run_test.pl new file mode 100755 index 00000000000..764f22f07d6 --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/run_test.pl @@ -0,0 +1,201 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib "$ENV{ACE_ROOT}/bin"; +use PerlACE::TestTarget; + +$status = 0; +$server_level = '0'; +$client_level = '0'; + +$orbs = 10; +$payload = 3000; +$threads = 10; +$count = 10; + +foreach $i (@ARGV) { + if ($i eq '-debug') { + $server_level = '3'; + # Level enough for reporting errors. + $client_level = '9'; + } +} + +my $server = PerlACE::TestTarget::create_target (1) || die "Create target 1 failed\n"; +my $client1 = PerlACE::TestTarget::create_target (2) || die "Create target 2 failed\n"; +my $client2 = PerlACE::TestTarget::create_target (3) || die "Create target 3 failed\n"; +my $client3 = PerlACE::TestTarget::create_target (4) || die "Create target 4 failed\n"; + +$uipmc = "corbaloc:miop:1.0\@1.0-domain-1/225.10.10.10:" . $server->RandomPort(); + +my $c_svcconf = "uipmc_client$PerlACE::svcconf_ext"; +my $s_svcconf = "uipmc_server$PerlACE::svcconf_ext"; +my $cleanup_select = 0; #int rand (3); +if ($cleanup_select == 1) { + $s_svcconf = "uipmc_server_n$PerlACE::svcconf_ext"; +} elsif ($cleanup_select == 2) { + $s_svcconf = "uipmc_server_m$PerlACE::svcconf_ext"; +} +print STDOUT "Using $s_svcconf for the server initialization\n"; + +my $iorbase = "server.ior"; +my $server_iorfile = $server->LocalFile ($iorbase); +my $server_svcconf = $server->LocalFile ($s_svcconf); +my $client1_iorfile = $client1->LocalFile ($iorbase); +my $client1_svcconf = $client1->LocalFile ($c_svcconf); +my $client2_iorfile = $client2->LocalFile ($iorbase); +my $client2_svcconf = $client2->LocalFile ($c_svcconf); +my $client3_iorfile = $client3->LocalFile ($iorbase); +my $client3_svcconf = $client3->LocalFile ($c_svcconf); +$server->DeleteFile($iorbase); +$client1->DeleteFile($iorbase); +$client2->DeleteFile($iorbase); +$client3->DeleteFile($iorbase); + +$SV = $server->CreateProcess ("server", + "-ORBdebuglevel $server_level " . + "-ORBSvcConf $server_svcconf " . + "-ORBRcvSock 500000 " . + "-o $server_iorfile -u $uipmc -s $orbs " . + "-p $payload -t " . + $threads * 1 . " -c $count"); +$CL1 = $client1->CreateProcess ("client", + "-ORBSvcConf $client1_svcconf " . + "-ORBDebugLevel $client_level " . + "-k file://$client1_iorfile -p $payload " . + "-t $threads -c $count -f 0"); +=COMMENT +$CL2 = $client2->CreateProcess ("client", + "-ORBSvcConf $client2_svcconf " . + "-k file://$client2_iorfile -p $payload " . + "-t $threads -c $count -f $threads"); +$CL3 = $client3->CreateProcess ("client", + "-ORBSvcConf $client3_svcconf " . + "-k file://$client3_iorfile -p $payload " . + "-t $threads -c $count " . + "-f " . $threads * 2); +=cut + +$server_status = $SV->Spawn (); + +if ($server_status != 0) { + print STDERR "ERROR: server returned $server_status\n"; + exit 1; +} + +if ($server->WaitForFileTimed ($iorbase, + $server->ProcessStartWaitInterval()) == -1) { + print STDERR "ERROR: cannot find file <$server_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +if ($server->GetFile ($iorbase) == -1) { + print STDERR "ERROR: cannot retrieve file <$server_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} +if ($client1->PutFile ($iorbase) == -1) { + print STDERR "ERROR: cannot set file <$client1_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} +if ($client2->PutFile ($iorbase) == -1) { + print STDERR "ERROR: cannot set file <$client2_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} +if ($client3->PutFile ($iorbase) == -1) { + print STDERR "ERROR: cannot set file <$client3_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +$client_status = $CL1->Spawn (); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +=COMMENT +$client_status = $CL2->Spawn (); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + $CL1->Kill (); $CL1->TimedWait (1); + exit 1; +} + +$client_status = $CL3->Spawn (); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + $CL1->Kill (); $CL1->TimedWait (1); + $CL2->Kill (); $CL2->TimedWait (1); + exit 1; +} +=cut + +$client_status = $CL1->WaitKill ($client1->ProcessStopWaitInterval() + + $count * $threads); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + # $CL2->Kill (); $CL2->TimedWait (1); + # $CL3->Kill (); $CL3->TimedWait (1); + exit 1; +} + +=COMMENT +$client_status = $CL2->WaitKill ($client2->ProcessStopWaitInterval() + + $count * $threads); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + $CL3->Kill (); $CL3->TimedWait (1); + exit 1; +} + +$client_status = $CL3->WaitKill ($client3->ProcessStopWaitInterval() + + $count * $threads); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} +=cut + +# Shutdown the server. +$CL1->Arguments ("-ORBSvcConf $client1_svcconf -k file://$client1_iorfile -x"); +$client_status = $CL1->Spawn (); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +$client_status = $CL1->WaitKill ($client1->ProcessStopWaitInterval()); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +$server_status = $SV->WaitKill ($server->ProcessStopWaitInterval()); + +if ($server_status != 0) { + print STDERR "ERROR: server returned $server_status\n"; + $status = 1; +} + +$server->DeleteFile($iorbase); +$client1->DeleteFile($iorbase); +$client2->DeleteFile($iorbase); +$client3->DeleteFile($iorbase); + +exit $status; diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/run_test_ipv6.pl b/TAO/orbsvcs/tests/Miop/McastFragmentation/run_test_ipv6.pl new file mode 100755 index 00000000000..4ca7bae0cd5 --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/run_test_ipv6.pl @@ -0,0 +1,201 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib "$ENV{ACE_ROOT}/bin"; +use PerlACE::TestTarget; + +$status = 0; +$server_level = '0'; +$client_level = '0'; + +$orbs = 10; +$payload = 300000; +$threads = 10; +$count = 10; + +foreach $i (@ARGV) { + if ($i eq '-debug') { + $server_level = '9'; + # Level enough for reporting errors. + $client_level = '3'; + } +} + +my $server = PerlACE::TestTarget::create_target (1) || die "Create target 1 failed\n"; +my $client1 = PerlACE::TestTarget::create_target (2) || die "Create target 2 failed\n"; +my $client2 = PerlACE::TestTarget::create_target (3) || die "Create target 3 failed\n"; +my $client3 = PerlACE::TestTarget::create_target (4) || die "Create target 4 failed\n"; + +$uipmc = "corbaloc:miop:1.0\@1.0-domain-1/[FF01:0:0:0:0:10:10:10]:" . $server->RandomPort(); + +my $c_svcconf = "uipmc_client$PerlACE::svcconf_ext"; +my $s_svcconf = "uipmc_server$PerlACE::svcconf_ext"; +my $cleanup_select = int rand (3); +if ($cleanup_select == 1) { + $s_svcconf = "uipmc_server_n$PerlACE::svcconf_ext"; +} elsif ($cleanup_select == 2) { + $s_svcconf = "uipmc_server_m$PerlACE::svcconf_ext"; +} +print STDOUT "Using $s_svcconf for the server initialization\n"; + +my $iorbase = "server.ior"; +my $server_iorfile = $server->LocalFile ($iorbase); +my $server_svcconf = $server->LocalFile ($s_svcconf); +my $client1_iorfile = $client1->LocalFile ($iorbase); +my $client1_svcconf = $client1->LocalFile ($c_svcconf); +my $client2_iorfile = $client2->LocalFile ($iorbase); +my $client2_svcconf = $client2->LocalFile ($c_svcconf); +my $client3_iorfile = $client3->LocalFile ($iorbase); +my $client3_svcconf = $client3->LocalFile ($c_svcconf); +$server->DeleteFile($iorbase); +$client1->DeleteFile($iorbase); +$client2->DeleteFile($iorbase); +$client3->DeleteFile($iorbase); + +$SV = $server->CreateProcess ("server", + "-ORBdebuglevel $server_level " . + "-ORBSvcConf $server_svcconf " . + "-ORBRcvSock 500000 " . + "-o $server_iorfile -u $uipmc -s $orbs " . + "-p $payload -t " . + $threads * 1 . " -c $count"); +$CL1 = $client1->CreateProcess ("client", + "-ORBSvcConf $client1_svcconf " . + "-ORBDebugLevel $client_level " . + "-k file://$client1_iorfile -p $payload " . + "-t $threads -c $count -f 0"); +=COMMENT +$CL2 = $client2->CreateProcess ("client", + "-ORBSvcConf $client2_svcconf " . + "-k file://$client2_iorfile -p $payload " . + "-t $threads -c $count -f $threads"); +$CL3 = $client3->CreateProcess ("client", + "-ORBSvcConf $client3_svcconf " . + "-k file://$client3_iorfile -p $payload " . + "-t $threads -c $count " . + "-f " . $threads * 2); +=cut + +$server_status = $SV->Spawn (); + +if ($server_status != 0) { + print STDERR "ERROR: server returned $server_status\n"; + exit 1; +} + +if ($server->WaitForFileTimed ($iorbase, + $server->ProcessStartWaitInterval()) == -1) { + print STDERR "ERROR: cannot find file <$server_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +if ($server->GetFile ($iorbase) == -1) { + print STDERR "ERROR: cannot retrieve file <$server_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} +if ($client1->PutFile ($iorbase) == -1) { + print STDERR "ERROR: cannot set file <$client1_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} +if ($client2->PutFile ($iorbase) == -1) { + print STDERR "ERROR: cannot set file <$client2_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} +if ($client3->PutFile ($iorbase) == -1) { + print STDERR "ERROR: cannot set file <$client3_iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +$client_status = $CL1->Spawn (); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +=COMMENT +$client_status = $CL2->Spawn (); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + $CL1->Kill (); $CL1->TimedWait (1); + exit 1; +} + +$client_status = $CL3->Spawn (); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + $CL1->Kill (); $CL1->TimedWait (1); + $CL2->Kill (); $CL2->TimedWait (1); + exit 1; +} +=cut + +$client_status = $CL1->WaitKill ($client1->ProcessStopWaitInterval() + + $count * $threads); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + # $CL2->Kill (); $CL2->TimedWait (1); + # $CL3->Kill (); $CL3->TimedWait (1); + exit 1; +} + +=COMMENT +$client_status = $CL2->WaitKill ($client2->ProcessStopWaitInterval() + + $count * $threads); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + $CL3->Kill (); $CL3->TimedWait (1); + exit 1; +} + +$client_status = $CL3->WaitKill ($client3->ProcessStopWaitInterval() + + $count * $threads); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} +=cut + +# Shutdown the server. +$CL1->Arguments ("-ORBSvcConf $client1_svcconf -k file://$client1_iorfile -x"); +$client_status = $CL1->Spawn (); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +$client_status = $CL1->WaitKill ($client1->ProcessStopWaitInterval()); +if ($client_status != 0) { + print STDERR "ERROR: client returned $client_status\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +$server_status = $SV->WaitKill ($server->ProcessStopWaitInterval()); + +if ($server_status != 0) { + print STDERR "ERROR: server returned $server_status\n"; + $status = 1; +} + +$server->DeleteFile($iorbase); +$client1->DeleteFile($iorbase); +$client2->DeleteFile($iorbase); +$client3->DeleteFile($iorbase); + +exit $status; diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/server.cpp b/TAO/orbsvcs/tests/Miop/McastFragmentation/server.cpp new file mode 100644 index 00000000000..8cd9bffcf6f --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/server.cpp @@ -0,0 +1,201 @@ +// +// $Id$ +// + +#include "ace/Task.h" +#include "ace/Get_Opt.h" +#include "orbsvcs/PortableGroup/GOA.h" +#include "Hello_Impl.h" + +ACE_TCHAR const *uipmc_url = + ACE_TEXT ("corbaloc:miop:1.0@1.0-test-1/225.1.1.8:32158"); +ACE_TCHAR const *ior_output_file = ACE_TEXT ("test.ior"); +CORBA::ULong orb_threads = 10; +CORBA::ULong payload_length = 1000; +CORBA::ULong client_threads = 5; +CORBA::ULong payload_calls = 100; + +int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, ACE_TEXT ("o:u:s:p:t:c:")); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.opt_arg (); + break; + + case 'u': + uipmc_url = get_opts.opt_arg (); + break; + + case 's': + orb_threads = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case 'p': + payload_length = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case 't': + client_threads = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case 'c': + payload_calls = ACE_OS::strtoul (get_opts.opt_arg (), 0, 10); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("usage: %s ") + ACE_TEXT ("-o <iorfile> ") + ACE_TEXT ("-u <uipmc_url> ") + ACE_TEXT ("-s <orb_threads> ") + ACE_TEXT ("-p <payload_length> ") + ACE_TEXT ("-t <client_threads> ") + ACE_TEXT ("-c <payload_calls>") + ACE_TEXT ("\n"), + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line. + return 0; +} + +class OrbThread : public ACE_Task_Base +{ +public: + OrbThread (CORBA::ORB_ptr orb) + : orb_ (CORBA::ORB::_duplicate (orb)) + { + } + + virtual int svc (void) + { + try + { + this->orb_->run (); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught in OrbThread:"); + return -1; + } + + return 0; + } + +private: + CORBA::ORB_var orb_; +}; + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + try + { + CORBA::ORB_var orb = CORBA::ORB_init (argc, argv); + + if (parse_args (argc, argv) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("ERROR: wrong arguments\n")), + -1); + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA"); + + PortableGroup::GOA_var root_goa = + PortableGroup::GOA::_narrow (poa_object.in ()); + + if (CORBA::is_nil (root_goa.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("ERROR: nil RootPOA\n")), + -1); + + PortableServer::POAManager_var poa_manager = root_goa->the_POAManager (); + + // Create UIPMC reference. + CORBA::Object_var obj = orb->string_to_object (uipmc_url); + + // Create id. + PortableServer::ObjectId_var id = + root_goa->create_id_for_reference (obj.in ()); + + // Activate UIPMC Object. + UIPMC_Object_Impl* uipmc_impl; + ACE_NEW_RETURN (uipmc_impl, + UIPMC_Object_Impl (payload_length, + client_threads, + payload_calls), + -1); + PortableServer::ServantBase_var owner_transfer1 (uipmc_impl); + root_goa->activate_object_with_id (id.in (), uipmc_impl); + + Test::UIPMC_Object_var uipmc_obj = + Test::UIPMC_Object::_unchecked_narrow (obj.in ()); + + if (CORBA::is_nil (uipmc_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("ERROR: nil Hello object\n")), + -1); + CORBA::String_var ior = orb->object_to_string (obj.in ()); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("MIOP object is <%C>\n"), ior.in ())); + + Hello_Impl* hello_impl; + ACE_NEW_RETURN (hello_impl, + Hello_Impl (orb.in (), uipmc_obj.in ()), + -1); + PortableServer::ServantBase_var owner_transfer2 (hello_impl); + + obj = hello_impl->_this (); + + ior = orb->object_to_string (obj.in ()); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Activated as <%C>\n"), ior.in ())); + + // If the ior_output_file exists, output the ior to it. + if (ior_output_file != 0) + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + + if (output_file == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Cannot open output file ") + ACE_TEXT ("for writing IOR: %s"), + ior_output_file), + -1); + } + + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + poa_manager->activate (); + + { + // start clients + OrbThread orb_thr (orb.in ()); + orb_thr.activate (THR_NEW_LWP | THR_JOINABLE, orb_threads); + orb_thr.wait (); + } + + root_goa->destroy (1, 1); + + orb->destroy (); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught in server main ():"); + return -1; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\nServer finished successfully.\n"))); + return 0; +} diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_client.conf b/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_client.conf new file mode 100644 index 00000000000..9b6c361dc96 --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_client.conf @@ -0,0 +1,6 @@ +dynamic UIPMC_Factory Service_Object * TAO_PortableGroup:_make_TAO_UIPMC_Protocol_Factory() "" +static Resource_Factory "-ORBProtocolFactory IIOP_Factory -ORBProtocolFactory UIPMC_Factory" + +dynamic PortableGroup_Loader Service_Object * TAO_PortableGroup:_make_TAO_PortableGroup_Loader() "" + +dynamic MIOP_Resource_Factory Service_Object * TAO_PortableGroup:_make_TAO_MIOP_Resource_Factory() "-ORBMaxFragmentSize 1400 -ORBMaxFragmentRate 10000 -ORBSendHighWaterMark 2800" diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server.conf b/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server.conf new file mode 100644 index 00000000000..48bc9f81cfe --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server.conf @@ -0,0 +1,6 @@ +dynamic UIPMC_Factory Service_Object * TAO_PortableGroup:_make_TAO_UIPMC_Protocol_Factory() "" +static Resource_Factory "-ORBProtocolFactory IIOP_Factory -ORBProtocolFactory UIPMC_Factory" + +dynamic PortableGroup_Loader Service_Object * TAO_PortableGroup:_make_TAO_PortableGroup_Loader() "" + +dynamic MIOP_Resource_Factory Service_Object * TAO_PortableGroup:_make_TAO_MIOP_Resource_Factory() "-ORBFragmentsCleanupStrategy delay -ORBFragmentsCleanupBound 10000" # 10 seconds diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server_m.conf b/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server_m.conf new file mode 100644 index 00000000000..fc9e8267fb2 --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server_m.conf @@ -0,0 +1,6 @@ +dynamic UIPMC_Factory Service_Object * TAO_PortableGroup:_make_TAO_UIPMC_Protocol_Factory() "" +static Resource_Factory "-ORBProtocolFactory IIOP_Factory -ORBProtocolFactory UIPMC_Factory" + +dynamic PortableGroup_Loader Service_Object * TAO_PortableGroup:_make_TAO_PortableGroup_Loader() "" + +dynamic MIOP_Resource_Factory Service_Object * TAO_PortableGroup:_make_TAO_MIOP_Resource_Factory() "-ORBFragmentsCleanupStrategy memory -ORBFragmentsCleanupBound 20000000" # ~20MB diff --git a/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server_n.conf b/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server_n.conf new file mode 100644 index 00000000000..c34a1912cb9 --- /dev/null +++ b/TAO/orbsvcs/tests/Miop/McastFragmentation/uipmc_server_n.conf @@ -0,0 +1,6 @@ +dynamic UIPMC_Factory Service_Object * TAO_PortableGroup:_make_TAO_UIPMC_Protocol_Factory() "" +static Resource_Factory "-ORBProtocolFactory IIOP_Factory -ORBProtocolFactory UIPMC_Factory" + +dynamic PortableGroup_Loader Service_Object * TAO_PortableGroup:_make_TAO_PortableGroup_Loader() "" + +dynamic MIOP_Resource_Factory Service_Object * TAO_PortableGroup:_make_TAO_MIOP_Resource_Factory() "-ORBFragmentsCleanupStrategy number -ORBFragmentsCleanupBound 50" # 50 messages diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp index ff9fbfb745a..0030af8cb4b 100644 --- a/TAO/tao/Connection_Handler.cpp +++ b/TAO/tao/Connection_Handler.cpp @@ -61,10 +61,16 @@ TAO_Connection_Handler::set_socket_option (ACE_SOCK &sock, && sock.set_option (SOL_SOCKET, SO_SNDBUF, (void *) &snd_size, - sizeof (snd_size)) == -1 - && errno != ENOTSUP) + sizeof (snd_size)) == -1) { - return -1; + if (TAO_debug_level) + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Connection_Handler::") + ACE_TEXT ("set_socket_option, setting SO_SNDBUF failed ") + ACE_TEXT ("'%m'\n"))); + + if (errno != ENOTSUP) + return -1; } #endif /* !ACE_LACKS_SO_SNDBUF */ @@ -73,16 +79,22 @@ TAO_Connection_Handler::set_socket_option (ACE_SOCK &sock, && sock.set_option (SOL_SOCKET, SO_RCVBUF, (void *) &rcv_size, - sizeof (int)) == -1 - && errno != ENOTSUP) + sizeof (int)) == -1) { - return -1; + if (TAO_debug_level) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Connection_Handler::") + ACE_TEXT ("set_socket_option, setting SO_RCVBUF failed ") + ACE_TEXT ("'%m'\n"))); + + if (errno != ENOTSUP) + return -1; } #endif /* !ACE_LACKS_SO_RCVBUF */ #if defined (ACE_LACKS_SO_SNDBUF) && defined (ACE_LACKS_SO_RCVBUF) - ACE_UNUSED_ARG (snd_size); - ACE_UNUSED_ARG (rcv_size); + ACE_UNUSED_ARG (snd_size); + ACE_UNUSED_ARG (rcv_size); #endif // Set the close-on-exec flag for that file descriptor. If the |