summaryrefslogtreecommitdiff
path: root/trunk/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp')
-rw-r--r--trunk/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp597
1 files changed, 597 insertions, 0 deletions
diff --git a/trunk/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp b/trunk/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp
new file mode 100644
index 00000000000..66c4a913cdd
--- /dev/null
+++ b/trunk/TAO/orbsvcs/orbsvcs/CosEvent/CEC_TypedEventChannel.cpp
@@ -0,0 +1,597 @@
+// -*- C++ -*-
+//
+// $Id$
+
+#include "orbsvcs/CosEvent/CEC_TypedEventChannel.h"
+#include "orbsvcs/CosEvent/CEC_Dispatching.h"
+#include "orbsvcs/CosEvent/CEC_TypedConsumerAdmin.h"
+#include "orbsvcs/CosEvent/CEC_TypedSupplierAdmin.h"
+#include "orbsvcs/CosEvent/CEC_ConsumerControl.h"
+#include "orbsvcs/CosEvent/CEC_SupplierControl.h"
+#include "tao/debug.h"
+#include "tao/ORB_Core.h"
+#include "ace/Dynamic_Service.h"
+#include "ace/Reactor.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "orbsvcs/CosEvent/CEC_TypedEventChannel.inl"
+#endif /* __ACE_INLINE__ */
+
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+// Implementation skeleton constructor
+TAO_CEC_TypedEventChannel::
+TAO_CEC_TypedEventChannel (const TAO_CEC_TypedEventChannel_Attributes& attr,
+ TAO_CEC_Factory* factory,
+ int own_factory)
+ : typed_supplier_poa_ (PortableServer::POA::_duplicate (attr.typed_supplier_poa)),
+ typed_consumer_poa_ (PortableServer::POA::_duplicate (attr.typed_consumer_poa)),
+ orb_ (CORBA::ORB::_duplicate (attr.orb)),
+ interface_repository_ (CORBA::Repository::_duplicate (attr.interface_repository)),
+ factory_ (factory),
+ own_factory_ (own_factory),
+ consumer_reconnect_ (attr.consumer_reconnect),
+ supplier_reconnect_ (attr.supplier_reconnect),
+ disconnect_callbacks_ (attr.disconnect_callbacks),
+ destroy_on_shutdown_ (attr.destroy_on_shutdown),
+ destroyed_ (0)
+{
+ if (this->factory_ == 0)
+ {
+ this->factory_ =
+ ACE_Dynamic_Service<TAO_CEC_Factory>::instance ("CEC_Factory");
+ this->own_factory_ = 0;
+ ACE_ASSERT (this->factory_ != 0);
+ }
+
+ this->dispatching_ =
+ this->factory_->create_dispatching (this);
+ this->typed_consumer_admin_ =
+ this->factory_->create_consumer_admin (this);
+ this->typed_supplier_admin_ =
+ this->factory_->create_supplier_admin (this);
+ this->consumer_control_ =
+ this->factory_->create_consumer_control (this);
+ this->supplier_control_ =
+ this->factory_->create_supplier_control (this);
+}
+
+// Implementation skeleton destructor
+TAO_CEC_TypedEventChannel::~TAO_CEC_TypedEventChannel (void)
+{
+ this->clear_ifr_cache ();
+ this->interface_description_.close ();
+
+ this->factory_->destroy_dispatching (this->dispatching_);
+ this->dispatching_ = 0;
+
+ this->factory_->destroy_consumer_admin (this->typed_consumer_admin_);
+ this->typed_consumer_admin_ = 0;
+ this->factory_->destroy_supplier_admin (this->typed_supplier_admin_);
+ this->typed_supplier_admin_ = 0;
+
+ if (this->own_factory_)
+ delete this->factory_;
+}
+
+void
+TAO_CEC_TypedEventChannel::activate (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ this->dispatching_->activate ();
+ this->consumer_control_->activate ();
+ this->supplier_control_->activate ();
+}
+
+namespace
+{
+ struct ShutdownHandler : ACE_Event_Handler
+ {
+ ShutdownHandler (CORBA::ORB_ptr orb)
+ : orb_ (CORBA::ORB::_duplicate (orb)) {}
+ CORBA::ORB_var orb_;
+
+ virtual int handle_timeout (const ACE_Time_Value&, const void*)
+ {
+ orb_->shutdown (1);
+ return 0;
+ }
+
+ };
+}
+
+void
+TAO_CEC_TypedEventChannel::shutdown (ACE_ENV_SINGLE_ARG_DECL)
+{
+ this->dispatching_->shutdown ();
+ this->supplier_control_->shutdown ();
+ this->consumer_control_->shutdown ();
+
+ PortableServer::POA_var typed_consumer_poa =
+ this->typed_consumer_admin_->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var typed_consumer_id =
+ typed_consumer_poa->servant_to_id (this->typed_consumer_admin_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ typed_consumer_poa->deactivate_object (typed_consumer_id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ PortableServer::POA_var typed_supplier_poa =
+ this->typed_supplier_admin_->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var typed_supplier_id =
+ typed_supplier_poa->servant_to_id (this->typed_supplier_admin_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ typed_supplier_poa->deactivate_object (typed_supplier_id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->typed_supplier_admin_->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->typed_consumer_admin_->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ if (destroy_on_shutdown_)
+ {
+ // Deactivate the Typed EC
+ PortableServer::POA_var t_poa =
+ this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ PortableServer::ObjectId_var t_id =
+ t_poa->servant_to_id (this ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ t_poa->deactivate_object (t_id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_Event_Handler *timer;
+ ACE_NEW (timer, ShutdownHandler (this->orb_.in ()));
+ ACE_Reactor *reactor = this->orb_->orb_core ()->reactor ();
+ reactor->schedule_timer (timer, 0, ACE_Time_Value (1));
+ }
+}
+
+void
+TAO_CEC_TypedEventChannel::connected (TAO_CEC_TypedProxyPushConsumer* consumer
+ ACE_ENV_ARG_DECL)
+{
+ this->typed_supplier_admin_->connected (consumer ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_CEC_TypedEventChannel::reconnected (TAO_CEC_TypedProxyPushConsumer* consumer
+ ACE_ENV_ARG_DECL)
+{
+ this->typed_supplier_admin_->reconnected (consumer ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_CEC_TypedEventChannel::disconnected (TAO_CEC_TypedProxyPushConsumer* consumer
+ ACE_ENV_ARG_DECL)
+{
+ this->typed_supplier_admin_->disconnected (consumer ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_CEC_TypedEventChannel::connected (TAO_CEC_ProxyPushSupplier* supplier
+ ACE_ENV_ARG_DECL)
+{
+ this->typed_consumer_admin_->connected (supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_CEC_TypedEventChannel::reconnected (TAO_CEC_ProxyPushSupplier* supplier
+ ACE_ENV_ARG_DECL)
+{
+ this->typed_consumer_admin_->reconnected (supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_CEC_TypedEventChannel::disconnected (TAO_CEC_ProxyPushSupplier* supplier
+ ACE_ENV_ARG_DECL)
+{
+ this->typed_consumer_admin_->disconnected (supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+// Find from the ifr cache the operation and return the parameter array pointer.
+TAO_CEC_Operation_Params *
+TAO_CEC_TypedEventChannel::find_from_ifr_cache (const char *operation)
+{
+ TAO_CEC_Operation_Params *found = 0;
+
+ this->interface_description_.find (operation, found);
+
+ return found;
+}
+
+// Insert the operation and its parameters into the ifr cache.
+int
+TAO_CEC_TypedEventChannel::insert_into_ifr_cache (const char *operation_,
+ TAO_CEC_Operation_Params *parameters_)
+{
+ // Make sure that the supplied Object reference is valid,
+ // i.e. not nil.
+ if (operation_ == 0 || parameters_ == 0)
+ {
+ errno = EINVAL;
+ return -1;
+ };
+
+ CORBA::String_var operation = CORBA::string_dup (operation_);
+
+ int result = this->interface_description_.bind (operation.in (), parameters_);
+
+ if (result == 0)
+ {
+ // Transfer ownership to the Object InterfaceDescription map.
+ (void) operation._retn ();
+ }
+
+ return result;
+}
+
+// Clear the ifr cache, freeing up all its contents.
+int
+TAO_CEC_TypedEventChannel::clear_ifr_cache (void)
+{
+ for (Iterator i = this->interface_description_.begin ();
+ i != this->interface_description_.end ();
+ ++i)
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** Destroying operation %s from ifr cache *****\n"),
+ const_cast<char *> ((*i).ext_id_)));
+ }
+
+ // Deallocate the operation
+ CORBA::string_free (const_cast<char *> ((*i).ext_id_));
+
+ // Destroy the parameter
+ delete ((*i).int_id_);
+ }
+
+ int result = this->interface_description_.unbind_all ();
+
+ return result;
+}
+
+
+// The function performs a lookup_id of the passed interface in the IFR,
+// and then obtains the FullInterfaceDescription.
+// The base interfaces for the interface are stored on this class.
+// All the operations and their parameters are then inserted in the ifr cache.
+// Function returns 0 if successful or -1 on a failure.
+int
+TAO_CEC_TypedEventChannel::cache_interface_description (const char *interface_
+ ACE_ENV_ARG_DECL)
+{
+ ACE_TRY
+ {
+ // Lookup the Interface Name in the IFR
+ CORBA::Contained_var contained =
+ this->interface_repository_->lookup_id (interface_ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Narrow the interface
+ CORBA::InterfaceDef_var interface =
+ CORBA::InterfaceDef::_narrow (contained.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (interface.in () ))
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** CORBA::InterfaceDef::_narrow failed for interface %s *****\n"),
+ interface_));
+ }
+ return -1;
+ }
+ else
+ {
+ // Obtain the full interface description
+ CORBA::InterfaceDef::FullInterfaceDescription_var fid =
+ interface->describe_interface (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Obtain the base interfaces
+ this->base_interfaces_ = fid->base_interfaces;
+ if (TAO_debug_level >= 10)
+ {
+ for (CORBA::ULong base=0; base<fid->base_interfaces.length(); base++)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** Base interface %s found on interface %s *****\n"),
+ static_cast<char const*>(fid->base_interfaces[base]),
+ interface_ ));
+ }
+ }
+
+ // Obtain the operations
+ for (CORBA::ULong oper=0; oper<fid->operations.length(); oper++)
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** Operation %s found on interface %s, num params %d *****\n"),
+ fid->operations[oper].name.in(),
+ interface_,
+ fid->operations[oper].parameters.length() ));
+ }
+
+ // Obtain the parameters
+ CORBA::ULong num_params = fid->operations[oper].parameters.length();
+ TAO_CEC_Operation_Params *oper_params = new TAO_CEC_Operation_Params (num_params);
+
+ for (CORBA::ULong param=0; param<num_params; param++)
+ {
+ oper_params->parameters_[param].name_ = fid->operations[oper].parameters[param].name.in();
+ oper_params->parameters_[param].type_ = fid->operations[oper].parameters[param].type;
+ switch (fid->operations[oper].parameters[param].mode)
+ {
+ case CORBA::PARAM_IN:
+ oper_params->parameters_[param].direction_ = CORBA::ARG_IN;
+ break;
+ case CORBA::PARAM_OUT:
+ oper_params->parameters_[param].direction_ = CORBA::ARG_OUT;
+ break;
+ case CORBA::PARAM_INOUT:
+ oper_params->parameters_[param].direction_ = CORBA::ARG_INOUT;
+ break;
+ }
+
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** Parameter %s found on operation %s *****\n"),
+ oper_params->parameters_[param].name_.in(),
+ fid->operations[oper].name.in() ));
+ }
+ }
+
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** Adding operation %s with %d parameters to the IFR cache *****\n"),
+ fid->operations[oper].name.in(),
+ oper_params->num_params_ ));
+ }
+
+ int result = insert_into_ifr_cache (fid->operations[oper].name.in(), oper_params);
+ if (result != 0)
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** Adding operation to IFR cache failed *****\n")));
+ }
+ }
+ }
+ }
+ }
+ ACE_CATCH (CORBA::SystemException, sysex)
+ {
+ if (TAO_debug_level >= 4)
+ {
+ ACE_PRINT_EXCEPTION (sysex, "during TAO_CEC_TypedEventChannel::cache_interface_description");
+ }
+ return -1;
+ }
+ ACE_CATCHANY
+ {
+ if (TAO_debug_level >= 4)
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "ACE_ANY_EXCEPTION raised during TAO_CEC_TypedEventChannel::cache_interface_description");
+ }
+ return -1;
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+// A consumer is attempting to register its uses_interface.
+// Note only a single interface can be registered with this version of the EC.
+// For users that require more than one interface, start another EC.
+// If the passed uses_interface is the same as a registered interface the function returns 0.
+// If an attempt is made to register a second interface, this function will return -1
+// and the TypedConsumerAdmin will throw CosTypedEventChannelAdmin::NoSuchImplementation.
+// If neither a consumer nor a supplier has registered an interface,
+// the function calls cache_interface_description and returns 0 if successful.
+int
+TAO_CEC_TypedEventChannel::consumer_register_uses_interace (const char *uses_interface
+ ACE_ENV_ARG_DECL)
+{
+ // Check if a consumer has already registered an interface with the typed EC
+ if (this->uses_interface_.length() > 0)
+ {
+ // Check if the registered uses_interface_ == the new uses_interface
+ if (this->uses_interface_ == ACE_CString (uses_interface))
+ {
+ return 0;
+ }
+ else
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** different uses_interface_ already registered *****\n")));
+ }
+ return -1;
+ }
+ }
+
+ // Check if a supplier has already registered an inerface with the typed EC
+ if (this->supported_interface_.length() > 0)
+ {
+ // Check if the registered supported_interface_ == the new uses_interface
+ if (this->supported_interface_ == ACE_CString (uses_interface))
+ {
+ this->uses_interface_ = uses_interface;
+ return 0;
+ }
+ else
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** different supported_interface_ already registered *****\n")));
+ }
+ return -1;
+ }
+ }
+ else
+ {
+ // Neither a consumer nor a supplier has connected yet
+ int result = cache_interface_description (uses_interface ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (result == 0)
+ {
+ this->uses_interface_ = uses_interface;
+ }
+ return result;
+ }
+}
+
+// A supplier is attempting to register its supported_interface.
+// Note only a single interface can be registered with this version of the EC.
+// For users that require more than one interface, start another EC.
+// If the passed supported_interface is the same as a registered interface the function returns 0.
+// If an attempt is made to register a second interface, this function will return -1
+// and the TypedSupplierAdmin will throw CosTypedEventChannelAdmin::InterfaceNotSupported.
+// If neither a consumer nor a supplier has registered an interface,
+// the function calls cache_interface_description and returns 0 if successful.
+int
+TAO_CEC_TypedEventChannel::supplier_register_supported_interface (const char *supported_interface
+ ACE_ENV_ARG_DECL)
+{
+ // Check if a supplier has already registered an interface with the typed EC
+ if (this->supported_interface_.length() > 0)
+ {
+ // Check if the registered interface == the new supported_interface
+ if (this->supported_interface_ == ACE_CString (supported_interface))
+ {
+ return 0;
+ }
+ else
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** different supported_interface_ already registered *****\n")));
+ }
+ return -1;
+ }
+ }
+
+ // Check if a consumer has already registered an inerface with the typed EC
+ if (this->uses_interface_.length() > 0)
+ {
+ // Check if the registered uses_interface_ == the new supported_interface
+ if (this->uses_interface_ == ACE_CString (supported_interface))
+ {
+ this->supported_interface_ = supported_interface;
+ return 0;
+ }
+ else
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("***** different uses_interface_ already registered *****\n")));
+ }
+ return -1;
+ }
+ }
+ else
+ {
+ // Neither a consumer nor a supplier has connected yet
+ int result = cache_interface_description (supported_interface ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (result == 0)
+ {
+ this->supported_interface_ = supported_interface;
+ }
+ return result;
+ }
+}
+
+// Function creates a NVList and populates it from the parameter information.
+void
+TAO_CEC_TypedEventChannel::create_operation_list (TAO_CEC_Operation_Params *oper_params,
+ CORBA::NVList_out new_list
+ ACE_ENV_ARG_DECL)
+{
+ this->orb_->create_list (0, new_list ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ for (CORBA::ULong param=0; param<oper_params->num_params_; param++)
+ {
+
+ CORBA::Any any_1;
+ any_1._tao_set_typecode(oper_params->parameters_[param].type_.in ());
+
+ new_list->add_value (oper_params->parameters_[param].name_. in (),
+ any_1,
+ oper_params->parameters_[param].direction_
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+}
+
+// Function creates an empty NVList.
+void
+TAO_CEC_TypedEventChannel::create_list (CORBA::Long count,
+ CORBA::NVList_out new_list
+ ACE_ENV_ARG_DECL)
+{
+ this->orb_->create_list (count, new_list ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+// The CosTypedEventChannelAdmin::TypedEventChannel methods...
+CosTypedEventChannelAdmin::TypedConsumerAdmin_ptr
+TAO_CEC_TypedEventChannel::for_consumers (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ return this->typed_consumer_admin_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+CosTypedEventChannelAdmin::TypedSupplierAdmin_ptr
+TAO_CEC_TypedEventChannel::for_suppliers (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ return this->typed_supplier_admin_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+void
+TAO_CEC_TypedEventChannel::destroy (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (!destroyed_)
+ {
+ destroyed_ = 1;
+ this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+}
+
+CORBA::Policy_ptr
+TAO_CEC_TypedEventChannel::create_roundtrip_timeout_policy
+(const ACE_Time_Value &timeout)
+{
+ return this->factory_->create_roundtrip_timeout_policy (timeout);
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL