summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp162
1 files changed, 162 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
new file mode 100644
index 00000000000..615c364ffef
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
@@ -0,0 +1,162 @@
+// $Id$
+
+#include "orbsvcs/Notify/Any/PushConsumer.h"
+
+ACE_RCSID (Notify,
+ TAO_Notify_PushConsumer,
+ "$Id$")
+
+#include "ace/Bound_Ptr.h"
+#include "tao/Stub.h" // For debug messages printing out ORBid.
+#include "tao/ORB_Core.h"
+#include "orbsvcs/CosEventCommC.h"
+#include "orbsvcs/Notify/Event.h"
+#include "orbsvcs/Notify/Properties.h"
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+TAO_Notify_PushConsumer::TAO_Notify_PushConsumer (TAO_Notify_ProxySupplier* proxy)
+ :TAO_Notify_Consumer (proxy)
+{
+}
+
+TAO_Notify_PushConsumer::~TAO_Notify_PushConsumer ()
+{
+}
+
+void
+TAO_Notify_PushConsumer::init (CosEventComm::PushConsumer_ptr push_consumer)
+{
+ // Initialize only once
+ ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
+
+ // push_consumer not optional
+ if (CORBA::is_nil (push_consumer))
+ {
+ throw CORBA::BAD_PARAM();
+ }
+
+ try
+ {
+ if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
+ {
+ this->push_consumer_ = CosEventComm::PushConsumer::_duplicate (push_consumer);
+
+ this->publish_ =
+ CosNotifyComm::NotifyPublish::_narrow (push_consumer);
+ }
+ else
+ {
+ // "Port" consumer's object reference from receiving ORB to dispatching ORB.
+ CORBA::String_var temp =
+ TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
+
+ CORBA::Object_var obj =
+ TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
+
+ CosEventComm::PushConsumer_var new_cos_comm_pc =
+ CosEventComm::PushConsumer::_unchecked_narrow(obj.in());
+
+ this->push_consumer_ =
+ CosEventComm::PushConsumer::_duplicate (new_cos_comm_pc.in());
+
+ //
+ // Note that here we do an _unchecked_narrow() in order to avoid
+ // making a call on the consumer b/c the consumer may not have activated
+ // its POA just yet. That means that before we use this reference the first
+ // time, we'll actually need to call _is_a() on it, i.e., the equivalent
+ // of an _narrow(). At the time of this writing, the only use of
+ // this->publish_ is in TAO_NS_Consumer::dispatch_updates_i (the superclass).
+ // If any other use is made of this data member, then the code to validate
+ // the actual type of the target object must be refactored.
+ this->publish_ =
+ CosNotifyComm::NotifyPublish::_unchecked_narrow (obj.in());
+
+
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Any push init dispatching ORB id is %s.\n",
+ obj->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+ }
+ }
+ catch (const CORBA::TRANSIENT& ex)
+ {
+ ex._tao_print_exception ("Got a TRANSIENT in NS_PushConsumer::init");
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_PushConsumer %@\n", this));
+ }
+ catch (const CORBA::Exception&)
+ {
+ // _narrow failed which probably means the interface is CosEventComm type.
+ }
+}
+
+void
+TAO_Notify_PushConsumer::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_Notify_PushConsumer::push (const CORBA::Any& payload)
+{
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10) {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Any push dispatching ORB id is %s.\n",
+ this->push_consumer_->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+
+ this->push_consumer_->push (payload);
+}
+
+void
+TAO_Notify_PushConsumer::push (const CosNotification::StructuredEvent& event)
+{
+ CORBA::Any any;
+
+ TAO_Notify_Event::translate (event, any);
+
+ this->push_consumer_->push (any);
+}
+
+/// Push a batch of events to this consumer.
+void
+TAO_Notify_PushConsumer::push (const CosNotification::EventBatch& event)
+{
+ ACE_ASSERT(false);
+ ACE_UNUSED_ARG (event);
+ // TODO exception?
+}
+
+ACE_CString
+TAO_Notify_PushConsumer::get_ior (void) const
+{
+ ACE_CString result;
+ CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance()->orb();
+ try
+ {
+ CORBA::String_var ior = orb->object_to_string(this->push_consumer_.in());
+ result = static_cast<const char*> (ior.in ());
+ }
+ catch (const CORBA::Exception&)
+ {
+ result.fast_clear();
+ }
+ return result;
+}
+
+void
+TAO_Notify_PushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer)
+{
+ TAO_Notify_PushConsumer* tmp =
+ dynamic_cast<TAO_Notify_PushConsumer*> (old_consumer);
+ ACE_ASSERT(tmp != 0);
+ this->init(tmp->push_consumer_.in());
+ this->schedule_timer(false);
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL