summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/examples/Notify/Lanes/Supplier.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/examples/Notify/Lanes/Supplier.cpp')
-rw-r--r--TAO/orbsvcs/examples/Notify/Lanes/Supplier.cpp185
1 files changed, 185 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/Notify/Lanes/Supplier.cpp b/TAO/orbsvcs/examples/Notify/Lanes/Supplier.cpp
new file mode 100644
index 00000000000..8b3a2a50862
--- /dev/null
+++ b/TAO/orbsvcs/examples/Notify/Lanes/Supplier.cpp
@@ -0,0 +1,185 @@
+// $Id$
+
+#include "Supplier.h"
+
+ACE_RCSID (Notify, TAO_Notify_Lanes_Supplier, "$Id$")
+
+#include "tao/ORB_Core.h"
+
+TAO_Notify_Lanes_Supplier::TAO_Notify_Lanes_Supplier (TAO_Notify_ORB_Objects& orb_objects)
+ : orb_objects_ (orb_objects)
+ , proxy_consumer_id_ (0)
+ , expected_consumer_count_ (2)
+ , consumers_connected_ (lock_)
+ , consumer_count_ (0)
+{
+}
+
+TAO_Notify_Lanes_Supplier::~TAO_Notify_Lanes_Supplier ()
+{
+}
+
+void
+TAO_Notify_Lanes_Supplier::init (CosNotifyChannelAdmin::SupplierAdmin_var& admin, int expected_consumer_count ACE_ENV_ARG_DECL)
+{
+ // First initialize the class members.
+ this->admin_ = admin;
+ this->expected_consumer_count_ = expected_consumer_count;
+
+ this->connect (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+void
+TAO_Notify_Lanes_Supplier::run (ACE_ENV_SINGLE_ARG_DECL)
+{
+ // The Priority at which we send the first event to the first consumer.
+ RTCORBA::Priority priority = 1;
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+
+ ACE_DEBUG ((LM_DEBUG, "(%P, %t) Waiting for %d consumers to connect...\n", this->expected_consumer_count_));
+
+ // Wait till the consumers are ready to go.
+ while (this->consumer_count_ != this->expected_consumer_count_)
+ this->consumers_connected_.wait ();
+ }
+
+ // Send an event each to each consumer.
+ // Each Consumer expects a different priority.
+ for (int i = 0; i < this->expected_consumer_count_; ++i, ++priority)
+ {
+ // Set this threads priority.
+ this->orb_objects_.current_->the_priority (priority ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Make sure the priority was set, get the priority of the current thread.
+ RTCORBA::Priority thread_priority =
+ this->orb_objects_.current_->the_priority (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // We will send this event.
+ CosNotification::StructuredEvent event;
+
+ // Populate the Priority field so that the consumer can deduce the suppliers priority
+ // to do a sanity check when it receives the event.
+ CosNotification::PropertySeq& opt = event.header.variable_header;
+ opt.length (1);
+
+ CORBA::Any buffer;
+ buffer <<= (CORBA::Short) thread_priority;
+
+ opt[0].name = CORBA::string_dup (CosNotification::Priority);
+ opt[0].value = buffer;
+
+ // Set the domain and type nams in the event's fixed header.
+ char type[BUFSIZ];
+ ACE_OS::sprintf (type, "TEST_TYPE_%d", thread_priority);
+
+ event.header.fixed_header.event_type.domain_name = CORBA::string_dup("TEST_DOMAIN");
+ event.header.fixed_header.event_type.type_name = CORBA::string_dup(type);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P, %t) Supplier is sending an event of type %s at priority %d\n", type, thread_priority));
+
+ // send the event
+ this->send_event (event ACE_ENV_ARG_PARAMETER);
+ } // repeat for the next consumer at the next priority.
+
+ // Disconnect from the EC
+ this->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ // Deactivate this object.
+ this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ // we're done. shutdown the ORB to exit the process.
+ this->orb_objects_.orb_->shutdown (1);
+}
+
+void
+TAO_Notify_Lanes_Supplier::connect (ACE_ENV_SINGLE_ARG_DECL)
+{
+ // Activate the supplier object.
+ CosNotifyComm::StructuredPushSupplier_var objref = this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Obtain the proxy.
+ CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer =
+ this->admin_->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
+ , proxy_consumer_id_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ()));
+
+ // narrow
+ this->proxy_consumer_ =
+ CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ()));
+
+ // connect to the proxyconsumer.
+ proxy_consumer_->connect_structured_push_supplier (objref.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_Notify_Lanes_Supplier::disconnect (ACE_ENV_SINGLE_ARG_DECL)
+{
+ ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
+
+ this->proxy_consumer_->disconnect_structured_push_consumer(ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+void
+TAO_Notify_Lanes_Supplier::deactivate (ACE_ENV_SINGLE_ARG_DECL)
+{
+ PortableServer::POA_var poa (this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER));
+ ACE_CHECK;
+
+ PortableServer::ObjectId_var id (poa->servant_to_id (this
+ ACE_ENV_ARG_PARAMETER));
+ ACE_CHECK;
+
+ poa->deactivate_object (id.in()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_Notify_Lanes_Supplier::subscription_change (const CosNotification::EventTypeSeq & added,
+ const CosNotification::EventTypeSeq & /*removed */
+ ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException,
+ CosNotifyComm::InvalidEventType
+ ))
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+
+ // Count the number of consumers connect and signal the supplier thread when the expected count have connected.
+ if (added.length () > 0)
+ {
+ if (++this->consumer_count_ == this->expected_consumer_count_)
+ this->consumers_connected_.signal ();
+ }
+}
+
+void
+TAO_Notify_Lanes_Supplier::send_event (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL)
+{
+ ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
+
+ proxy_consumer_->push_structured_event (event ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_Notify_Lanes_Supplier::disconnect_structured_push_supplier (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ))
+{
+ this->deactivate (ACE_ENV_SINGLE_ARG_PARAMETER);
+}