summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/Notify/Basic/Sequence.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/Notify/Basic/Sequence.cpp')
-rw-r--r--TAO/orbsvcs/tests/Notify/Basic/Sequence.cpp387
1 files changed, 387 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Notify/Basic/Sequence.cpp b/TAO/orbsvcs/tests/Notify/Basic/Sequence.cpp
new file mode 100644
index 00000000000..7842be55e57
--- /dev/null
+++ b/TAO/orbsvcs/tests/Notify/Basic/Sequence.cpp
@@ -0,0 +1,387 @@
+// $Id$
+
+#include "ace/Arg_Shifter.h"
+#include "ace/Get_Opt.h"
+#include "ace/OS_NS_unistd.h"
+#include "tao/debug.h"
+#include "Sequence.h"
+
+ACE_RCSID (Notify_Tests, Sequence, "$Id$")
+
+/***************************************************************************/
+
+SequencePushConsumer::SequencePushConsumer (Sequence *test_client)
+ : test_client_ (test_client)
+{
+}
+
+void
+SequencePushConsumer::push_structured_events (
+ const CosNotification::EventBatch &batch
+ ACE_ENV_ARG_DECL_NOT_USED
+ )
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ CosEventComm::Disconnected))
+{
+ this->test_client_->events_received_ += batch.length ();
+
+ if (batch.length () > this->test_client_->consumer_batch_size_)
+ ACE_DEBUG ((LM_ERROR,
+ "Error: Received more than max event batch %d\n",
+ batch.length ()));
+
+ this->test_client_->on_event_received ();
+
+ ACE_OS::sleep (this->test_client_->consumer_delay_);
+}
+
+/***************************************************************************/
+
+SequencePushSupplier::SequencePushSupplier (
+ Sequence* test_client
+ )
+ : test_client_ (test_client)
+{
+}
+
+SequencePushSupplier::~SequencePushSupplier (void)
+{
+}
+
+/***************************************************************************/
+Sequence::Sequence (void)
+ : event_count_ (15), supplier_batch_size_ (5), consumer_batch_size_ (3),
+ pacing_ (2), order_policy_ (CosNotification::PriorityOrder), events_received_ (0),
+ consumer_delay_ (1)
+{
+}
+
+Sequence::~Sequence (void)
+{
+}
+
+int
+Sequence::init (int argc,
+ char* argv []
+ ACE_ENV_ARG_DECL)
+{
+ if (TAO_debug_level)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Options: event count = %d \n"
+ "supplier batch size = %d \n"
+ "consumer batch size = %d \n"
+ "pacing = %d secs \n"
+ , event_count_
+ , supplier_batch_size_
+ , consumer_batch_size_
+ , pacing_));
+
+ ACE_DEBUG ((LM_DEBUG, "consumer delay = %d\n", consumer_delay_.sec ()));
+ }
+
+ // Initialize the base class.
+ Notify_Test_Client::init (argc,
+ argv
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ // Create all participents.
+ this->create_EC (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ CosNotifyChannelAdmin::AdminID adminid;
+
+ this->supplier_admin_ =
+ this->ec_->new_for_suppliers (this->ifgop_,
+ adminid
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
+
+ this->consumer_admin_ =
+ this->ec_->new_for_consumers (this->ifgop_,
+ adminid
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ()));
+
+ ACE_NEW_RETURN (this->consumer_,
+ SequencePushConsumer (this),
+ -1);
+ this->consumer_->init (root_poa_.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ this->consumer_->connect (this->consumer_admin_.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ CosNotification::QoSProperties properties (3);
+ properties.length (3);
+
+ properties[0].name = CORBA::string_dup (CosNotification::MaximumBatchSize);
+ properties[0].value <<= (CORBA::Long) this->consumer_batch_size_;
+ properties[1].name = CORBA::string_dup (CosNotification::PacingInterval);
+ properties[1].value <<= (TimeBase::TimeT) (this->pacing_ * 1000 * 10000);
+ properties[2].name = CORBA::string_dup (CosNotification::OrderPolicy);
+ properties[2].value <<= this->order_policy_;
+
+ this->consumer_->get_proxy_supplier ()->set_qos (properties);
+
+ ACE_NEW_RETURN (this->supplier_,
+ SequencePushSupplier (this),
+ -1);
+ this->supplier_->init (root_poa_.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ this->supplier_->connect (this->supplier_admin_.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ consumer_start( 0 );
+
+ return 0;
+}
+
+int
+Sequence::parse_args (int argc,
+ char *argv[])
+{
+ ACE_Arg_Shifter arg_shifter (argc,
+ argv);
+ const char *current_arg = 0;
+
+ while (arg_shifter.is_anything_left ())
+ {
+ if ((current_arg = arg_shifter.get_the_parameter ("-events")))
+ {
+ this->event_count_ = ACE_OS::atoi (current_arg); // The number of events to send/receive.
+
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-SupplierBatchSize")))
+ {
+ this->supplier_batch_size_ = ACE_OS::atoi (current_arg); // Supplier batch size
+
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-ConsumerBatchSize")))
+ {
+ this->consumer_batch_size_ = ACE_OS::atoi (current_arg); // Consumer batch size
+
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-ConsumerDelay")))
+ {
+ this->consumer_delay_ = ACE_Time_Value (ACE_OS::atoi (current_arg), 0); // Consumer delay in secs.
+
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-Pacing"))) // in seconds
+ {
+ this->pacing_ = (TimeBase::TimeT) ACE_OS::atoi (current_arg);
+
+ arg_shifter.consume_arg ();
+ }
+
+ else if (arg_shifter.cur_arg_strncasecmp ("-?") == 0)
+ {
+ ACE_DEBUG((LM_DEBUG,
+ "usage: %s "
+ "-events event_count "
+ "-SupplierBatchSize size "
+ "-ConsumerBatchSize size "
+ "-ConsumerDelay delay "
+ "-Pacing pacing \n",
+ argv[0], argv[0]));
+
+ arg_shifter.consume_arg ();
+
+ return -1;
+ }
+ else
+ {
+ arg_shifter.ignore_arg ();
+ }
+ }
+
+ return 0;
+}
+
+void
+Sequence::create_EC (ACE_ENV_SINGLE_ARG_DECL)
+{
+ CosNotifyChannelAdmin::ChannelID id;
+
+ this->ec_ = notify_factory_->create_channel (this->initial_qos_,
+ this->initial_admin_,
+ id
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ACE_ASSERT (!CORBA::is_nil (this->ec_.in ()));
+}
+
+void
+Sequence::on_event_received (void)
+{
+ if (TAO_debug_level)
+ ACE_DEBUG ((LM_DEBUG,
+ "Events received = %d\n",
+ this->events_received_.value ()));
+
+ if (this->events_received_.value () == this->event_count_)
+ {
+ ACE_DECLARE_NEW_CORBA_ENV;
+ this->end_test (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+}
+
+void
+Sequence::run_test (ACE_ENV_SINGLE_ARG_DECL)
+{
+ // operations:
+ CosNotification::StructuredEvent event;
+
+ // EventHeader.
+
+ // FixedEventHeader.
+ // EventType.
+ // string.
+ event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
+ // string
+ event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
+ // string
+ event.header.fixed_header.event_name = CORBA::string_dup("myevent");
+
+ // OptionalHeaderFields.
+ // PropertySeq.
+ // sequence<Property>: string name, any value
+ CosNotification::PropertySeq& qos = event.header.variable_header;
+ qos.length (1); // put nothing here
+
+ // FilterableEventBody
+ // PropertySeq
+ // sequence<Property>: string name, any value
+ event.filterable_data.length (3);
+ event.filterable_data[0].name = CORBA::string_dup("threshold");
+
+ event.filterable_data[1].name = CORBA::string_dup("temperature");
+ event.filterable_data[1].value <<= (CORBA::Long)70;
+
+ event.filterable_data[2].name = CORBA::string_dup("pressure");
+ event.filterable_data[2].value <<= (CORBA::Long)80;
+
+ CORBA::Short prio = CosNotification::LowestPriority;
+
+ CosNotification::EventBatch batch;
+ batch.length (this->supplier_batch_size_);
+ CORBA::ULong batch_index = 0;
+
+ for (int i = 0; i < this->event_count_; ++i)
+ {
+ event.filterable_data[0].value <<= (CORBA::Long)i;
+
+ // any
+ event.remainder_of_body <<= (CORBA::Long)i;
+
+ qos[0].name = CORBA::string_dup (CosNotification::Priority);
+ qos[0].value <<= (CORBA::Short)prio++;
+
+ batch[batch_index] = event;
+ batch_index++;
+
+ if (batch_index == this->supplier_batch_size_)
+ {
+ batch.length (batch_index); // set the correct length
+
+ if (TAO_debug_level)
+ ACE_DEBUG ((LM_DEBUG, "Sending batch with %d events\n", batch.length ()));
+
+ this->supplier_->send_events (batch
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // reset
+ batch.length (this->supplier_batch_size_);
+ batch_index = 0;
+ }
+ } // for
+
+ // send the last batch.
+ if (batch_index > 0)
+ {
+ batch.length (batch_index); // set the correct length
+
+ this->supplier_->send_events (batch
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+}
+
+void
+Sequence::end_test (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ consumer_done( 0 );
+}
+
+int
+Sequence::check_results (void)
+{
+ // Destroy the channel.
+ ACE_DECLARE_NEW_CORBA_ENV;
+ this->ec_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (this->events_received_.value () == this->event_count_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Sequence test success\n"));
+ return 0;
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Sequence test failed!\n"));
+ return 1;
+ }
+}
+
+/***************************************************************************/
+
+int
+main (int argc, char* argv[])
+{
+ Sequence events;
+
+ if (events.parse_args (argc, argv) == -1)
+ {
+ return 1;
+ }
+
+ ACE_TRY_NEW_ENV
+ {
+ events.init (argc,
+ argv
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ events.run_test (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ events.ORB_run( ACE_ENV_SINGLE_ARG_PARAMETER );
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::Exception, se)
+ {
+ ACE_PRINT_EXCEPTION (se, "Error: ");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return events.check_results ();
+}