summaryrefslogtreecommitdiff
path: root/ACE/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp')
-rw-r--r--ACE/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp189
1 files changed, 189 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp b/ACE/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
new file mode 100644
index 00000000000..efea0f8111c
--- /dev/null
+++ b/ACE/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
@@ -0,0 +1,189 @@
+// $Id$
+#include "orbsvcs/Notify/Structured/StructuredPushConsumer.h"
+
+ACE_RCSID(RT_Notify, TAO_Notify_StructuredPushConsumer, "$Id$")
+
+#include "ace/Bound_Ptr.h"
+#include "tao/Stub.h" // For debug messages printing out ORBid.
+#include "tao/ORB_Core.h"
+#include "orbsvcs/Notify/Properties.h"
+#include "orbsvcs/Notify/Event.h"
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+TAO_Notify_StructuredPushConsumer::TAO_Notify_StructuredPushConsumer (TAO_Notify_ProxySupplier* proxy)
+ :TAO_Notify_Consumer (proxy), connection_valid(0)
+{
+}
+
+TAO_Notify_StructuredPushConsumer::~TAO_Notify_StructuredPushConsumer ()
+{
+}
+
+void
+TAO_Notify_StructuredPushConsumer::init (CosNotifyComm::StructuredPushConsumer_ptr push_consumer)
+{
+ // Initialize only once
+ ACE_ASSERT( CORBA::is_nil (this->push_consumer_.in()) );
+
+ if (CORBA::is_nil (push_consumer))
+ {
+ throw CORBA::BAD_PARAM();
+ }
+
+ if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
+ {
+ this->push_consumer_ = CosNotifyComm::StructuredPushConsumer::_duplicate (push_consumer);
+ this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+ }
+ else
+ {
+ // "Port" consumer's object reference from receiving ORB to dispatching ORB.
+ CORBA::String_var temp =
+ TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
+
+ CORBA::Object_var obj =
+ TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
+
+ try
+ {
+ CosNotifyComm::StructuredPushConsumer_var new_push_consumer =
+ CosNotifyComm::StructuredPushConsumer::_unchecked_narrow(obj.in());
+
+ this->push_consumer_ = CosNotifyComm::StructuredPushConsumer::_duplicate (new_push_consumer.in());
+ this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in());
+
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Structured push init dispatching ORB id is %s.\n",
+ obj->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+ }
+ catch (const CORBA::TRANSIENT& ex)
+ {
+ ex._tao_print_exception (
+ "Got a TRANSIENT in NS_StructuredPushConsumer::init");
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_StructuredPushConsumer %@\n", this));
+ }
+ catch (const CORBA::Exception&)
+ {
+ // _narrow failed
+ }
+ }
+}
+
+void
+TAO_Notify_StructuredPushConsumer::release (void)
+{
+ delete this;
+ //@@ inform factory
+}
+
+void
+TAO_Notify_StructuredPushConsumer::push (const CORBA::Any& event)
+{
+ CosNotification::StructuredEvent notification;
+
+ TAO_Notify_Event::translate (event, notification);
+
+ // Check if we have to validate connection
+ if ( !connection_valid ) {
+ try
+ {
+ CORBA::PolicyList_var inconsistent_policies;
+ this->push_consumer_->_validate_connection (inconsistent_policies.out());
+ }
+ catch (const CORBA::COMM_FAILURE&)
+ {
+ // Expected exception when a bad connection is found
+ if (TAO_debug_level >= 1)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Found bad connection.\n"));
+ }
+ }
+ connection_valid = 1;
+ }
+
+ last_ping_ = ACE_OS::gettimeofday ();
+
+ this->push_consumer_->push_structured_event (notification);
+}
+
+void
+TAO_Notify_StructuredPushConsumer::push (const CosNotification::StructuredEvent& event)
+{
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10) {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Structured push dispatching ORB id is %s.\n",
+ this->push_consumer_->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+
+ // Check if we have to validate connection
+ if ( !connection_valid ) {
+ try
+ {
+ CORBA::PolicyList_var inconsistent_policies;
+ this->push_consumer_->_validate_connection (inconsistent_policies.out());
+ }
+ catch (const CORBA::COMM_FAILURE&)
+ {
+ // Expected exception when a bad connection is found
+ if (TAO_debug_level >= 1)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Found bad connection.\n"));
+ }
+ }
+ connection_valid = 1;
+ }
+
+ last_ping_ = ACE_OS::gettimeofday ();
+
+ this->push_consumer_->push_structured_event (event);
+}
+
+/// Push a batch of events to this consumer.
+void
+TAO_Notify_StructuredPushConsumer::push (const CosNotification::EventBatch& event)
+{
+ ACE_ASSERT(false);
+ ACE_UNUSED_ARG (event);
+ // TODO exception?
+}
+
+void
+TAO_Notify_StructuredPushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer)
+{
+ TAO_Notify_StructuredPushConsumer* tmp = dynamic_cast<TAO_Notify_StructuredPushConsumer *> (old_consumer);
+ ACE_ASSERT(tmp != 0);
+ this->init(tmp->push_consumer_.in());
+ this->schedule_timer(false);
+}
+
+ACE_CString
+TAO_Notify_StructuredPushConsumer::get_ior (void) const
+{
+ ACE_CString result;
+ CORBA::ORB_var orb = TAO_Notify_PROPERTIES::instance()->orb();
+ try
+ {
+ CORBA::String_var ior = orb->object_to_string(this->push_consumer_.in());
+ result = static_cast<const char*> (ior.in ());
+ }
+ catch (const CORBA::Exception&)
+ {
+ result.fast_clear();
+ }
+ return result;
+}
+
+CORBA::Object_ptr
+TAO_Notify_StructuredPushConsumer::get_consumer (void)
+{
+ return CosNotifyComm::StructuredPushConsumer::_duplicate (this->push_consumer_.in ());
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL