diff options
Diffstat (limited to 'TAO/tests/Oneway_Buffering/client.cpp')
-rw-r--r-- | TAO/tests/Oneway_Buffering/client.cpp | 812 |
1 files changed, 812 insertions, 0 deletions
diff --git a/TAO/tests/Oneway_Buffering/client.cpp b/TAO/tests/Oneway_Buffering/client.cpp new file mode 100644 index 00000000000..0c4383016ef --- /dev/null +++ b/TAO/tests/Oneway_Buffering/client.cpp @@ -0,0 +1,812 @@ +// $Id$ + +#include "TestC.h" +#include "tao/Messaging/Messaging.h" +#include "tao/AnyTypeCode/Any.h" +#include "tao/TAOC.h" +#include "tao/AnyTypeCode/TAOA.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_sys_time.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID(Oneway_Buffering, client, "$Id$") + +const char *server_ior = "file://server.ior"; +const char *admin_ior = "file://admin.ior"; +int iterations = 20; + +int run_message_count_test = 0; +int run_timeout_test = 0; +int run_timeout_reactive_test = 0; +int run_buffer_size_test = 0; + +const int PAYLOAD_LENGTH = 1024; +const int BUFFERED_MESSAGES_COUNT = 10; +const unsigned int TIMEOUT_MILLISECONDS = 50; +const int BUFFER_SIZE = 10 * PAYLOAD_LENGTH; + +/// Check that no more than 10% of the messages are not sent. +const double LIVENESS_TOLERANCE = 0.9; + +/// Limit the depth of the liveness test, avoid blowing up the stack +/// on the server +const int LIVENESS_MAX_DEPTH = 256; + +/// Factor in GIOP overhead in the buffer size test +const double GIOP_OVERHEAD = 0.9; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "k:a:i:ctbr"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'k': + server_ior = get_opts.opt_arg (); + break; + + case 'a': + admin_ior = get_opts.opt_arg (); + break; + + case 'i': + iterations = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'c': + run_message_count_test = 1; + break; + + case 't': + run_timeout_test = 1; + break; + + case 'b': + run_buffer_size_test = 1; + break; + + case 'r': + run_timeout_reactive_test = 1; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-k <server_ior> " + "-a <admin_ior> " + "-i <iterations> " + "<-c|-t|-b|-r> " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int +run_message_count (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL); + +int +run_timeout (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL); + +int +run_timeout_reactive (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL); + +int +run_buffer_size (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL); + +int +main (int argc, char *argv[]) +{ + int test_failed = 0; + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + CORBA::Object_var tmp = + orb->string_to_object(server_ior ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + Test::Oneway_Buffering_var oneway_buffering = + Test::Oneway_Buffering::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (oneway_buffering.in ())) + { + ACE_ERROR_RETURN ((LM_DEBUG, + "Nil Test::Oneway_Buffering reference <%s>\n", + server_ior), + 1); + } + + tmp = + orb->string_to_object(admin_ior ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + Test::Oneway_Buffering_Admin_var oneway_buffering_admin = + Test::Oneway_Buffering_Admin::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (oneway_buffering_admin.in ())) + { + ACE_ERROR_RETURN ((LM_DEBUG, + "Nil Test::Oneway_Buffering_Admin reference <%s>\n", + admin_ior), + 1); + } + + if (run_message_count_test) + { + ACE_DEBUG ((LM_DEBUG, + "Running message count flushing test\n")); + test_failed = + run_message_count (orb.in (), + oneway_buffering.in (), + oneway_buffering_admin.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else if (run_timeout_test) + { + ACE_DEBUG ((LM_DEBUG, + "Running timeout flushing test\n")); + test_failed = + run_timeout (orb.in (), + oneway_buffering.in (), + oneway_buffering_admin.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else if (run_timeout_reactive_test) + { + ACE_DEBUG ((LM_DEBUG, + "Running timeout (reactive) flushing test\n")); + test_failed = + run_timeout_reactive (orb.in (), + oneway_buffering.in (), + oneway_buffering_admin.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else if (run_buffer_size_test) + { + ACE_DEBUG ((LM_DEBUG, + "Running buffer size flushing test\n")); + test_failed = + run_buffer_size (orb.in (), + oneway_buffering.in (), + oneway_buffering_admin.in () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else + { + ACE_ERROR ((LM_ERROR, + "ERROR: No test was configured\n")); + } + + oneway_buffering->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_OS::sleep(1); + oneway_buffering_admin->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception caught in client:"); + return 1; + } + ACE_ENDTRY; + + return test_failed; +} + +int +configure_policies (CORBA::ORB_ptr orb, + const TAO::BufferingConstraint &buffering_constraint, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_out flusher + ACE_ENV_ARG_DECL) +{ + CORBA::Object_var object = + orb->resolve_initial_references ("PolicyCurrent" ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + CORBA::PolicyCurrent_var policy_current = + CORBA::PolicyCurrent::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (CORBA::is_nil (policy_current.in ())) + { + ACE_ERROR ((LM_ERROR, "ERROR: Nil policy current\n")); + return 1; + } + CORBA::Any scope_as_any; + scope_as_any <<= Messaging::SYNC_NONE; + + CORBA::Any buffering_as_any; + buffering_as_any <<= buffering_constraint; + + CORBA::PolicyList policies (2); policies.length (2); + policies[0] = + orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, + scope_as_any + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + policies[1] = + orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE, + buffering_as_any + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + policy_current->set_policy_overrides (policies, CORBA::ADD_OVERRIDE + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + policies[1]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + TAO::BufferingConstraint flush_constraint; + flush_constraint.mode = TAO::BUFFER_FLUSH; + flush_constraint.message_count = 0; + flush_constraint.message_bytes = 0; + flush_constraint.timeout = 0; + + buffering_as_any <<= flush_constraint; + policies.length (1); + policies[0] = + orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE, + buffering_as_any + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + object = + oneway_buffering->_set_policy_overrides (policies, + CORBA::ADD_OVERRIDE + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + flusher = + Test::Oneway_Buffering::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + return 0; +} + +void +sync_server (Test::Oneway_Buffering_ptr flusher + ACE_ENV_ARG_DECL) +{ + // Get back in sync with the server... + flusher->flush (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + flusher->sync (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; +} + +int +run_liveness_test (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_ptr flusher, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL) +{ + ACE_DEBUG ((LM_DEBUG, ".... checking for liveness\n")); + int test_failed = 0; + + sync_server (flusher ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + CORBA::ULong send_count = + oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + int liveness_test_iterations = int(send_count); + + Test::Payload payload (PAYLOAD_LENGTH); + payload.length (PAYLOAD_LENGTH); + for (int j = 0; j != PAYLOAD_LENGTH; ++j) + payload[j] = CORBA::Octet(j % 256); + + int depth = 0; + for (int i = 0; i != liveness_test_iterations; ++i) + { + oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + send_count++; + + CORBA::ULong receive_count = + oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + // Once the system has sent enough messages we don't + // expect it to fall too far behind, i.e. at least 90% of the + // messages should be delivered.... + CORBA::ULong expected = + CORBA::ULong (LIVENESS_TOLERANCE * send_count); + + if (receive_count < expected) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d " + "not enough messages received %u " + "expected %u\n", + i, receive_count, expected)); + + sync_server (flusher ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + } + + if (depth++ == LIVENESS_MAX_DEPTH) + { + sync_server (flusher ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + depth = 0; + } + + } + + return test_failed; +} + +int +run_message_count (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL) +{ + TAO::BufferingConstraint buffering_constraint; + buffering_constraint.mode = TAO::BUFFER_MESSAGE_COUNT; + buffering_constraint.message_count = BUFFERED_MESSAGES_COUNT; + buffering_constraint.message_bytes = 0; + buffering_constraint.timeout = 0; + + Test::Oneway_Buffering_var flusher; + int test_failed = + configure_policies (orb, buffering_constraint, + oneway_buffering, flusher.out () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (test_failed != 0) + return test_failed; + + Test::Payload payload (PAYLOAD_LENGTH); + payload.length (PAYLOAD_LENGTH); + for (int j = 0; j != PAYLOAD_LENGTH; ++j) + payload[j] = CORBA::Octet(j % 256); + + CORBA::ULong send_count = 0; + for (int i = 0; i != iterations; ++i) + { + sync_server (flusher.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + CORBA::ULong initial_receive_count = + oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (initial_receive_count != send_count) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d message lost (%u != %u)\n", + i, initial_receive_count, send_count)); + } + + while (1) + { + oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + send_count++; + + CORBA::ULong receive_count = + oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + CORBA::ULong iteration_count = + send_count - initial_receive_count; + if (receive_count != initial_receive_count) + { + if (iteration_count < CORBA::ULong(BUFFERED_MESSAGES_COUNT)) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d flush before " + "message count reached. " + "Iteration count = %u, Threshold = %u\n", + i, + iteration_count, BUFFERED_MESSAGES_COUNT)); + } + break; + } + + if (iteration_count > 3 * BUFFERED_MESSAGES_COUNT) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d no flush past " + "message count threshold. " + "Iteration count = %u, Threshold = %u\n", + i, + iteration_count, BUFFERED_MESSAGES_COUNT)); + break; + } + } + } + + int liveness_test_failed = + run_liveness_test (orb, + oneway_buffering, + flusher.in (), + oneway_buffering_admin + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (liveness_test_failed) + test_failed = 1; + + return test_failed; +} + +int +run_timeout (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL) +{ + TAO::BufferingConstraint buffering_constraint; + buffering_constraint.mode = TAO::BUFFER_TIMEOUT; + buffering_constraint.message_count = 0; + buffering_constraint.message_bytes = 0; + buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000; + + Test::Oneway_Buffering_var flusher; + int test_failed = + configure_policies (orb, buffering_constraint, + oneway_buffering, flusher.out () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (test_failed != 0) + return test_failed; + + Test::Payload payload (PAYLOAD_LENGTH); + payload.length (PAYLOAD_LENGTH); + for (int j = 0; j != PAYLOAD_LENGTH; ++j) + payload[j] = CORBA::Octet(j % 256); + + CORBA::ULong send_count = 0; + for (int i = 0; i != iterations; ++i) + { + sync_server (flusher.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + CORBA::ULong initial_receive_count = + oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (initial_receive_count != send_count) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d message lost (%u != %u)\n", + i, initial_receive_count, send_count)); + } + + ACE_Time_Value start = ACE_OS::gettimeofday (); + while (1) + { + oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + send_count++; + + CORBA::ULong receive_count = + oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start; + if (receive_count != initial_receive_count) + { + if (elapsed.msec () < TIMEOUT_MILLISECONDS) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d flush before " + "timeout expired. " + "Elapsed = %d, Timeout = %d msecs\n", + i, + elapsed.msec (), TIMEOUT_MILLISECONDS)); + } + // terminate the while loop. + break; + } + + if (elapsed.msec () > 3 * TIMEOUT_MILLISECONDS) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d no flush past " + "timeout threshold. " + "Elapsed = %d, Timeout = %d msecs\n", + i, + elapsed.msec (), TIMEOUT_MILLISECONDS)); + break; + } + } + } + + int liveness_test_failed = + run_liveness_test (orb, oneway_buffering, + flusher.in (), + oneway_buffering_admin + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (liveness_test_failed) + test_failed = 1; + + + return test_failed; +} + +int +run_timeout_reactive (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL) +{ + TAO::BufferingConstraint buffering_constraint; + buffering_constraint.mode = TAO::BUFFER_TIMEOUT; + buffering_constraint.message_count = 0; + buffering_constraint.message_bytes = 0; + buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000; + + Test::Oneway_Buffering_var flusher; + int test_failed = + configure_policies (orb, buffering_constraint, + oneway_buffering, flusher.out () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (test_failed != 0) + return test_failed; + + Test::Payload payload (PAYLOAD_LENGTH); + payload.length (PAYLOAD_LENGTH); + for (int j = 0; j != PAYLOAD_LENGTH; ++j) + payload[j] = CORBA::Octet(j % 256); + + CORBA::ULong send_count = 0; + for (int i = 0; i != iterations; ++i) + { + sync_server (flusher.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + CORBA::ULong initial_receive_count = + oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (initial_receive_count != send_count) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d message lost (%u != %u)\n", + i, initial_receive_count, send_count)); + } + + ACE_Time_Value start = ACE_OS::gettimeofday (); + for (int j = 0; j != 20; ++j) + { + oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + send_count++; + } + while (1) + { + CORBA::ULong receive_count = + oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + + ACE_CHECK_RETURN (-1); + + ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start; + if (receive_count != initial_receive_count) + { + if (elapsed.msec () < TIMEOUT_MILLISECONDS) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d flush before " + "timeout expired. " + "Elapsed = %d, Timeout = %d msecs\n", + i, + elapsed.msec (), TIMEOUT_MILLISECONDS)); + } + // terminate the while loop. + break; + } + + if (elapsed.msec () > 3 * TIMEOUT_MILLISECONDS) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d no flush past " + "timeout threshold. " + "Elapsed = %d, Timeout = %d msecs\n", + i, + elapsed.msec (), TIMEOUT_MILLISECONDS)); + break; + } + } + } + + int liveness_test_failed = + run_liveness_test (orb, oneway_buffering, + flusher.in (), + oneway_buffering_admin + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (liveness_test_failed) + test_failed = 1; + + + return test_failed; +} + +int +run_buffer_size (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin + ACE_ENV_ARG_DECL) +{ + TAO::BufferingConstraint buffering_constraint; + buffering_constraint.mode = TAO::BUFFER_MESSAGE_BYTES; + buffering_constraint.message_count = 0; + buffering_constraint.message_bytes = BUFFER_SIZE; + buffering_constraint.timeout = 0; + + Test::Oneway_Buffering_var flusher; + int test_failed = + configure_policies (orb, buffering_constraint, + oneway_buffering, flusher.out () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (test_failed != 0) + return test_failed; + + Test::Payload payload (PAYLOAD_LENGTH); + payload.length (PAYLOAD_LENGTH); + for (int j = 0; j != PAYLOAD_LENGTH; ++j) + payload[j] = CORBA::Octet(j % 256); + + CORBA::ULong bytes_sent = 0; + for (int i = 0; i != iterations; ++i) + { + sync_server (flusher.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + CORBA::ULong initial_bytes_received = + oneway_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (initial_bytes_received != bytes_sent) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d data lost (%u != %u)\n", + i, initial_bytes_received, bytes_sent)); + } + + while (1) + { + oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + bytes_sent += PAYLOAD_LENGTH; + + CORBA::ULong bytes_received = + oneway_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + CORBA::ULong payload_delta = + bytes_sent - initial_bytes_received; + if (bytes_received != initial_bytes_received) + { + // The queue has been flushed, check that enough data + // has been sent. The check cannot be precise because + // the ORB counts the GIOP message overhead, in this + // test we assume the overhead to be less than 10% + + if (payload_delta < CORBA::ULong (GIOP_OVERHEAD * BUFFER_SIZE)) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d flush before " + "minimum buffer size was reached. " + "Sent = %u, Minimum buffer = %u bytes\n", + i, + payload_delta, BUFFER_SIZE)); + } + break; + } + + if (payload_delta > 3 * BUFFER_SIZE) + { + test_failed = 1; + ACE_DEBUG ((LM_DEBUG, + "DEBUG: Iteration %d no flush past " + "buffer size threshold. " + "Sent = %u, Minimum buffer = %u bytes\n", + i, + payload_delta, BUFFER_SIZE)); + break; + } + } + } + + int liveness_test_failed = + run_liveness_test (orb, oneway_buffering, + flusher.in (), + oneway_buffering_admin + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (liveness_test_failed) + test_failed = 1; + + return test_failed; +} |