diff options
Diffstat (limited to 'TAO/orbsvcs/examples/Notify/Subscribe/Subscribe.cpp')
-rw-r--r-- | TAO/orbsvcs/examples/Notify/Subscribe/Subscribe.cpp | 471 |
1 files changed, 471 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/Notify/Subscribe/Subscribe.cpp b/TAO/orbsvcs/examples/Notify/Subscribe/Subscribe.cpp new file mode 100644 index 00000000000..a7a0e96183f --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/Subscribe/Subscribe.cpp @@ -0,0 +1,471 @@ +/* -*- C++ -*- $Id$ */ +#include "Subscribe.h" + +ACE_RCSID(Notify, Subscribe, "$Id$") + +#define NOTIFY_FACTORY_NAME "NotifyEventChannelFactory" +#define NAMING_SERVICE_NAME "NameService" + +#define DOMAIN_A "domain_a" +#define DOMAIN_B "domain_b" +#define DOMAIN_C "domain_c" + +#define TYPE_A "type_a" +#define TYPE_B "type_b" +#define TYPE_C "type_c" + +#define EVENT_COUNT 4 // number of events we expect the consumer to get from the EC + + ACE_Atomic_Op <TAO_SYNCH_MUTEX, int> g_result_count = 0; // we wait for 4 events. + +Subscribe::Subscribe (void) + : done_ (0) +{ + // No-Op. + ifgop_ = CosNotifyChannelAdmin::OR_OP; +} + +Subscribe::~Subscribe () +{ + this->ec_->destroy (); +} + +void +Subscribe::init (int argc, char *argv [] ACE_ENV_ARG_DECL) +{ + init_ORB (argc, argv ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + resolve_naming_service (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + resolve_Notify_factory (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + create_EC (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + create_supplieradmin (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + create_consumeradmin (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + create_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + create_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe::run (ACE_ENV_SINGLE_ARG_DECL) +{ + this->send_events (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + if (g_result_count != EVENT_COUNT) // if we still need to wait for events, run the orb. + { // if we still need to wait for events, run the orb. + while (!this->done_) + if (this->orb_->work_pending ()) + this->orb_->perform_work (); + } +} + +void +Subscribe::done (void) +{ + this->done_ = 1; +} + +void +Subscribe::init_ORB (int argc, + char *argv [] + ACE_ENV_ARG_DECL) +{ + this->orb_ = CORBA::ORB_init (argc, + argv, + "" + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + CORBA::Object_ptr poa_object = + this->orb_->resolve_initial_references("RootPOA" + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + if (CORBA::is_nil (poa_object)) + { + ACE_ERROR ((LM_ERROR, + " (%P|%t) Unable to initialize the POA.\n")); + return; + } + this->root_poa_ = + PortableServer::POA::_narrow (poa_object ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + PortableServer::POAManager_var poa_manager = + root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe::resolve_naming_service (ACE_ENV_SINGLE_ARG_DECL) +{ + CORBA::Object_var naming_obj = + this->orb_->resolve_initial_references (NAMING_SERVICE_NAME + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Need to check return value for errors. + if (CORBA::is_nil (naming_obj.in ())) + ACE_THROW (CORBA::UNKNOWN ()); + + this->naming_context_ = + CosNaming::NamingContext::_narrow (naming_obj.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe::resolve_Notify_factory (ACE_ENV_SINGLE_ARG_DECL) +{ + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup (NOTIFY_FACTORY_NAME); + + CORBA::Object_var obj = + this->naming_context_->resolve (name + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->notify_factory_ = + CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe::create_EC (ACE_ENV_SINGLE_ARG_DECL) +{ + CosNotifyChannelAdmin::ChannelID id; + + ec_ = notify_factory_->create_channel (initial_qos_, + initial_admin_, + id + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_ASSERT (!CORBA::is_nil (ec_.in ())); +} + +void +Subscribe::create_supplieradmin (ACE_ENV_SINGLE_ARG_DECL) +{ + CosNotifyChannelAdmin::AdminID adminid; + + supplier_admin_ = + ec_->new_for_suppliers (this->ifgop_, adminid ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ())); +} + +void +Subscribe:: create_consumeradmin (ACE_ENV_SINGLE_ARG_DECL) +{ + CosNotifyChannelAdmin::AdminID adminid; + + consumer_admin_ = + ec_->new_for_consumers (this->ifgop_, adminid ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ())); +} + +void +Subscribe::create_consumers (ACE_ENV_SINGLE_ARG_DECL) +{ + consumer_1_ = new Subscribe_StructuredPushConsumer (this); + consumer_1_->connect (this->consumer_admin_.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + consumer_2_ = new Subscribe_StructuredPushConsumer (this); + consumer_2_->connect (this->consumer_admin_.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe::create_suppliers (ACE_ENV_SINGLE_ARG_DECL) +{ + supplier_1_ = new Subscribe_StructuredPushSupplier (); + supplier_1_->connect (this->supplier_admin_.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + supplier_2_ = new Subscribe_StructuredPushSupplier (); + supplier_2_->connect (this->supplier_admin_.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe::send_events (ACE_ENV_SINGLE_ARG_DECL) +{ + // Setup the CA to receive event_type : "domain_A", "Type_a" + CosNotification::EventTypeSeq added(1); + CosNotification::EventTypeSeq removed (0); + added.length (1); + + added[0].domain_name = CORBA::string_dup (DOMAIN_A); + added[0].type_name = CORBA::string_dup (TYPE_A); + + this->consumer_admin_->subscription_change (added, removed ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Setup the Consumer 1 to receive event_type : "domain_B", "Type_b" + CosNotification::EventTypeSeq added_1(1); + CosNotification::EventTypeSeq removed_1 (0); + + added_1[0].domain_name = CORBA::string_dup (DOMAIN_B); + added_1[0].type_name = CORBA::string_dup (TYPE_B); + added_1.length (1); + removed_1.length (0); + + this->consumer_1_->get_proxy_supplier ()->subscription_change (added_1, removed_1 + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Setup the Consumer 2 to receive event_type : "domain_C", "Type_c" + CosNotification::EventTypeSeq added_2(1); + CosNotification::EventTypeSeq removed_2 (0); + + added_2[0].domain_name = CORBA::string_dup (DOMAIN_C); + added_2[0].type_name = CORBA::string_dup (TYPE_C); + added_2.length (1); + removed_2.length (0); + + this->consumer_2_->get_proxy_supplier ()->subscription_change (added_2, removed_2 + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Create the events - one of each type + // Event 1 + CosNotification::StructuredEvent event1; + event1.header.fixed_header.event_type.domain_name = + CORBA::string_dup(DOMAIN_A); + event1.header.fixed_header.event_type.type_name = + CORBA::string_dup(TYPE_A); + event1.header.fixed_header.event_name = CORBA::string_dup(""); + event1.header.variable_header.length (0); // put nothing here + event1.filterable_data.length (0); + event1.remainder_of_body <<= (CORBA::Long)10; + + // Event 2 + CosNotification::StructuredEvent event2; + event2.header.fixed_header.event_type.domain_name = + CORBA::string_dup(DOMAIN_B); + event2.header.fixed_header.event_type.type_name = + CORBA::string_dup(TYPE_B); + event2.header.fixed_header.event_name = CORBA::string_dup(""); + event2.header.variable_header.length (0); // put nothing here + event2.filterable_data.length (0); + event2.remainder_of_body <<= (CORBA::Long)10; + + // event 3 + CosNotification::StructuredEvent event3; + event3.header.fixed_header.event_type.domain_name = + CORBA::string_dup(DOMAIN_C); + event3.header.fixed_header.event_type.type_name = + CORBA::string_dup(TYPE_C); + event3.header.fixed_header.event_name = CORBA::string_dup(""); + event3.header.variable_header.length (0); // put nothing here + event3.filterable_data.length (0); + event3.remainder_of_body <<= (CORBA::Long)10; + + // let supplier 1 send all these events + for (int i = 0; i < 1; ++i) + { + supplier_1_->send_event (event1 ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + supplier_1_->send_event (event2 ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + supplier_1_->send_event (event3 ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } +} + +/*****************************************************************/ +Subscribe_StructuredPushConsumer::Subscribe_StructuredPushConsumer (Subscribe* subscribe) + : subscribe_ (subscribe) +{ +} + +Subscribe_StructuredPushConsumer::~Subscribe_StructuredPushConsumer () +{ +} + +void +Subscribe_StructuredPushConsumer::connect + (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL) +{ + // Activate the consumer with the default_POA_ + CosNotifyComm::StructuredPushConsumer_var objref = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + CosNotifyChannelAdmin::ProxySupplier_var proxysupplier = + consumer_admin->obtain_notification_push_supplier (CosNotifyChannelAdmin::STRUCTURED_EVENT, proxy_supplier_id_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_ASSERT (!CORBA::is_nil (proxysupplier.in ())); + + // narrow + this->proxy_supplier_ = + CosNotifyChannelAdmin::StructuredProxyPushSupplier:: + _narrow (proxysupplier.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_ASSERT (!CORBA::is_nil (proxy_supplier_.in ())); + + proxy_supplier_->connect_structured_push_consumer (objref.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe_StructuredPushConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + this->proxy_supplier_-> + disconnect_structured_push_supplier(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe_StructuredPushConsumer::offer_change + (const CosNotification::EventTypeSeq & /*added*/, + const CosNotification::EventTypeSeq & /*removed*/ + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyComm::InvalidEventType + )) +{ + // No-Op. +} + +void +Subscribe_StructuredPushConsumer::push_structured_event + (const CosNotification::StructuredEvent & notification + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventComm::Disconnected + )) +{ + const char* domain_name = + notification.header.fixed_header.event_type.domain_name; + + const char* type_name = + notification.header.fixed_header.event_type.type_name; + + ACE_DEBUG ((LM_DEBUG, "Structured Subscribe Consumer %d received event, domain = %s, type = %s\n", this->proxy_supplier_id_, domain_name, type_name)); + + if (++g_result_count == EVENT_COUNT) + subscribe_->done (); + +} + +void +Subscribe_StructuredPushConsumer::disconnect_structured_push_consumer + (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + // No-Op. +} + +CosNotifyChannelAdmin::StructuredProxyPushSupplier_ptr +Subscribe_StructuredPushConsumer::get_proxy_supplier (void) +{ + return proxy_supplier_.in (); +} + +/*****************************************************************/ + +Subscribe_StructuredPushSupplier::Subscribe_StructuredPushSupplier (void) +{ +} + +Subscribe_StructuredPushSupplier::~Subscribe_StructuredPushSupplier () +{ +} + +void +Subscribe_StructuredPushSupplier::connect + (CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin + ACE_ENV_ARG_DECL) +{ + CosNotifyComm::StructuredPushSupplier_var objref = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer = + supplier_admin->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT, proxy_consumer_id_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ())); + + // narrow + this->proxy_consumer_ = + CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ())); + + proxy_consumer_->connect_structured_push_supplier (objref.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe_StructuredPushSupplier::disconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ())); + + this->proxy_consumer_->disconnect_structured_push_consumer(ACE_ENV_SINGLE_ARG_PARAMETER); +} + +void +Subscribe_StructuredPushSupplier::subscription_change + (const CosNotification::EventTypeSeq & /*added*/, + const CosNotification::EventTypeSeq & /*removed */ + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosNotifyComm::InvalidEventType + )) +{ + //No-Op. +} + +void +Subscribe_StructuredPushSupplier::send_event + (const CosNotification::StructuredEvent& event + ACE_ENV_ARG_DECL) +{ + ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ())); + + proxy_consumer_->push_structured_event (event ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +void +Subscribe_StructuredPushSupplier::disconnect_structured_push_supplier + (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + // No-Op. +} + |