summaryrefslogtreecommitdiff
path: root/ACE/TAO/orbsvcs/tests/Event/Mcast/Complex/consumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/TAO/orbsvcs/tests/Event/Mcast/Complex/consumer.cpp')
-rw-r--r--ACE/TAO/orbsvcs/tests/Event/Mcast/Complex/consumer.cpp203
1 files changed, 203 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/tests/Event/Mcast/Complex/consumer.cpp b/ACE/TAO/orbsvcs/tests/Event/Mcast/Complex/consumer.cpp
new file mode 100644
index 00000000000..e2d4ebd7219
--- /dev/null
+++ b/ACE/TAO/orbsvcs/tests/Event/Mcast/Complex/consumer.cpp
@@ -0,0 +1,203 @@
+// $Id$
+
+#include "Constants.h"
+#include "orbsvcs/Event/EC_Lifetime_Utils_T.h"
+#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+#include "orbsvcs/RtecEventCommS.h"
+
+class EC_Consumer:
+ public POA_RtecEventComm::PushConsumer
+{
+public:
+
+ /// Constructor.
+ EC_Consumer (CORBA::ORB_var orb,
+ RtecEventChannelAdmin::EventChannel_var ec);
+
+ /// PushConsumer methods.
+ //@{
+ /// Logs each event. Initiates shutdown after receiving 100 events
+ /// of each type.
+ virtual void push (const RtecEventComm::EventSet &events);
+
+ /// No-op.
+ virtual void disconnect_push_consumer (void);
+
+private:
+
+ /// Helper - destroys EC, shutdowns the ORB and prints number of
+ /// events received.
+ void disconnect (void);
+
+ /// Number of events of different types pushed to us by EC.
+ //@{
+ size_t a_events_;
+ size_t b_events_;
+ size_t c_events_;
+ //@}
+
+ /// Cache these pointers for cleanup.
+ CORBA::ORB_var orb_;
+ RtecEventChannelAdmin::EventChannel_var ec_;
+};
+
+EC_Consumer::EC_Consumer (CORBA::ORB_var orb,
+ RtecEventChannelAdmin::EventChannel_var ec)
+ : a_events_ (0),
+ b_events_ (0),
+ c_events_ (0),
+ orb_ (orb),
+ ec_ (ec)
+{
+}
+
+void
+EC_Consumer::push (const RtecEventComm::EventSet &events)
+{
+ for (CORBA::ULong i = 0; i < events.length (); ++i)
+ {
+ switch (events[i].header.type)
+ {
+ case A_EVENT_TYPE:
+ ++this->a_events_;
+ ACE_DEBUG ((LM_DEBUG, " Received event A\n"));
+ break;
+
+ case B_EVENT_TYPE:
+ ++this->b_events_;
+ ACE_DEBUG ((LM_DEBUG, " Received event B\n"));
+ break;
+
+ case C_EVENT_TYPE:
+ ++this->c_events_;
+ ACE_DEBUG ((LM_DEBUG, " Received event C\n"));
+ break;
+
+ default:
+ ACE_DEBUG ((LM_DEBUG, " Received event of UNKNOWN event type\n"));
+ }
+ }
+
+ if (this->a_events_ >= 100
+ && this->b_events_ >= 100
+ && this->c_events_ >= 100)
+ this->disconnect ();
+}
+
+void
+EC_Consumer::disconnect_push_consumer (void)
+{
+}
+
+void
+EC_Consumer::disconnect (void)
+{
+ if (this->a_events_ == 100
+ && this->b_events_ == 100
+ && this->c_events_ == 100)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "SUCCESS:\n"
+ " Received 100 events of each type "
+ "(A, B, and C), as expected\n"));
+ }
+
+ this->ec_->destroy ();
+
+ this->orb_->shutdown (0);
+}
+
+////////////////////////////////////////////////////////////
+int
+check_for_nil (CORBA::Object_ptr obj, const char *message)
+{
+ if (CORBA::is_nil (obj))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ERROR: Object reference <%s> is nil\n",
+ message),
+ -1);
+ else
+ return 0;
+}
+
+int
+parse_args (int /* argc */, ACE_TCHAR ** /* argv */)
+{
+ return 0;
+}
+
+int
+ACE_TMAIN(int argc, ACE_TCHAR *argv[])
+{
+ try
+ {
+ // Initialize ORB and POA, POA Manager, parse args.
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv);
+
+ if (parse_args (argc, argv) == -1)
+ return 1;
+
+ CORBA::Object_var obj =
+ orb->resolve_initial_references ("RootPOA");
+ PortableServer::POA_var poa =
+ PortableServer::POA::_narrow (obj.in ());
+ if (check_for_nil (poa.in (), "POA") == -1)
+ return 1;
+
+ PortableServer::POAManager_var manager =
+ poa->the_POAManager ();
+
+ // Obtain reference to EC.
+ obj = orb->resolve_initial_references ("Event_Service");
+ RtecEventChannelAdmin::EventChannel_var ec =
+ RtecEventChannelAdmin::EventChannel::_narrow (obj.in ());
+ if (check_for_nil (ec.in (), "EC") == -1)
+ return 1;
+
+ // Create the consumer and register it with POA.
+ TAO_EC_Servant_Var<EC_Consumer> consumer_impl =
+ new EC_Consumer (orb, ec);
+
+ if (!consumer_impl.in ())
+ return -1;
+
+ RtecEventComm::PushConsumer_var consumer;
+ TAO_EC_Object_Deactivator consumer_deactivator;
+ activate (consumer,
+ poa.in (),
+ consumer_impl.in (),
+ consumer_deactivator);
+ consumer_deactivator.disallow_deactivation ();
+
+ // Obtain reference to ConsumerAdmin.
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers ();
+
+ // Obtain ProxyPushSupplier and connect this consumer.
+ RtecEventChannelAdmin::ProxyPushSupplier_var supplier =
+ consumer_admin->obtain_push_supplier ();
+
+ ACE_ConsumerQOS_Factory qos;
+ qos.start_disjunction_group (3);
+ qos.insert_type (A_EVENT_TYPE, 0);
+ qos.insert_type (B_EVENT_TYPE, 0);
+ qos.insert_type (C_EVENT_TYPE, 0);
+ supplier->connect_push_consumer (consumer.in (),
+ qos.get_ConsumerQOS ());
+
+ // Allow processing of CORBA requests.
+ manager->activate ();
+
+ // Receive events from EC.
+ orb->run ();
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Exception in Consumer:");
+ return 1;
+ }
+
+ return 0;
+}