summaryrefslogtreecommitdiff
path: root/trunk/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp')
-rw-r--r--trunk/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp582
1 files changed, 582 insertions, 0 deletions
diff --git a/trunk/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp b/trunk/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp
new file mode 100644
index 00000000000..0d85bbe8155
--- /dev/null
+++ b/trunk/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp
@@ -0,0 +1,582 @@
+// $Id$
+
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_Factory.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_SupplierAdmin.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_ConsumerAdmin.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_ProxyConsumer.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_ProxySupplier.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FtEventServiceInterceptor.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FT_ProxyAdmin_Base.h"
+#include "orbsvcs/FtRtEvent/EventChannel/IOGR_Maker.h"
+#include "orbsvcs/FtRtEvent/EventChannel/Replication_Service.h"
+#include "../Utils/Safe_InputCDR.h"
+#include "orbsvcs/FtRtecEventCommC.h"
+
+
+ACE_RCSID (EventChannel,
+ FTEC_Event_Channel_Impl,
+ "$Id$")
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+void obtain_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL)
+{
+ ec->consumer_admin()->obtain_proxy(op ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void obtain_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL)
+{
+ ec->supplier_admin()->obtain_proxy(op ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void obtain_push_consumer_and_connect(TAO_FTEC_Event_Channel_Impl* ec,
+ const FtRtecEventChannelAdmin::ObjectId& oid,
+ RtecEventComm::PushSupplier_ptr push_supplier,
+ const RtecEventChannelAdmin::SupplierQOS & qos
+ ACE_ENV_ARG_DECL)
+{
+ Request_Context_Repository().set_object_id(oid ACE_ENV_ARG_PARAMETER);
+
+ RtecEventChannelAdmin::ProxyPushConsumer_var consumer =
+ ec->supplier_admin()->obtain(ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ ACE_CHECK;
+
+ ACE_TRY {
+ consumer->connect_push_supplier(push_supplier, qos
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHALL {
+ ec->supplier_admin()->disconnect(consumer.in());
+ ACE_RE_THROW;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+}
+
+
+void connect_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL)
+{
+ PortableServer::POA_var poa= ec->supplier_poa();
+ ACE_CHECK;
+ FtRtecEventChannelAdmin::Connect_push_supplier_param& param
+ = op.param.connect_supplier_param();
+
+ TAO_FTEC_ProxyPushConsumer* proxy
+ = ec->find_proxy_push_consumer(op.object_id);
+
+ if (proxy == NULL) {
+ obtain_push_consumer_and_connect(ec,
+ op.object_id,
+ param.push_supplier.in(),
+ param.qos
+ ACE_ENV_ARG_PARAMETER);
+ }
+ else {
+ proxy->connect_push_supplier(param.push_supplier.in(),
+ param.qos
+ ACE_ENV_ARG_PARAMETER);
+ }
+ ACE_CHECK;
+}
+
+void obtain_push_supplier_and_connect(TAO_FTEC_Event_Channel_Impl* ec,
+ const FtRtecEventChannelAdmin::ObjectId& oid,
+ RtecEventComm::PushConsumer_ptr push_consumer,
+ const RtecEventChannelAdmin::ConsumerQOS & qos
+ ACE_ENV_ARG_DECL)
+{
+ Request_Context_Repository().set_object_id(oid ACE_ENV_ARG_PARAMETER);
+
+ RtecEventChannelAdmin::ProxyPushSupplier_var supplier =
+ ec->consumer_admin()->obtain(ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ ACE_CHECK;
+
+ ACE_TRY {
+ supplier->connect_push_consumer(push_consumer, qos
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHALL {
+ ec->consumer_admin()->disconnect(supplier.in());
+ ACE_RE_THROW;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+}
+
+
+void connect_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL)
+{
+ PortableServer::POA_var poa= ec->consumer_poa();
+ ACE_CHECK;
+ FtRtecEventChannelAdmin::Connect_push_consumer_param& param
+ = op.param.connect_consumer_param();
+
+ TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
+
+ if (proxy == NULL){
+ obtain_push_supplier_and_connect(ec,
+ op.object_id,
+ param.push_consumer.in(),
+ param.qos
+ ACE_ENV_ARG_PARAMETER);
+ }
+ else {
+ proxy->connect_push_consumer(param.push_consumer.in(),
+ param.qos
+ ACE_ENV_ARG_PARAMETER);
+ }
+ ACE_CHECK;
+}
+
+void disconnect_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL)
+{
+ PortableServer::POA_var poa= ec->consumer_poa();
+ ACE_CHECK;
+
+ TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
+
+
+ if (proxy == NULL) // proxy not found
+ ACE_THROW(FTRT::InvalidUpdate());
+
+ ACE_CHECK;
+ proxy->disconnect_push_supplier(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void disconnect_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL)
+{
+ PortableServer::POA_var poa= ec->supplier_poa();
+ ACE_CHECK;
+
+ TAO_FTEC_ProxyPushConsumer* proxy = ec->find_proxy_push_consumer(op.object_id);
+
+ if (proxy == NULL) // proxy not found
+ ACE_THROW(FTRT::InvalidUpdate());
+
+ ACE_CHECK;
+ proxy->disconnect_push_consumer(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+
+void suspend_connection (TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL)
+{
+ PortableServer::POA_var poa= ec->consumer_poa();
+ ACE_CHECK;
+
+ TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
+
+
+ if (proxy == NULL) // proxy not found
+ ACE_THROW(FTRT::InvalidUpdate());
+
+ ACE_CHECK;
+ proxy->suspend_connection(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void resume_connection(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL)
+{
+ TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
+
+
+ if (proxy == NULL) // proxy not found
+ ACE_THROW(FTRT::InvalidUpdate());
+
+ ACE_CHECK;
+ proxy->resume_connection(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+
+typedef void (*Set_update_fun)(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op
+ ACE_ENV_ARG_DECL);
+
+Set_update_fun const update_table[] = {
+ &obtain_push_supplier,
+ &obtain_push_consumer,
+ &disconnect_push_supplier,
+ &disconnect_push_consumer,
+ &suspend_connection,
+ &resume_connection,
+ &connect_push_supplier,
+ &connect_push_consumer
+};
+
+TAO_FTEC_Event_Channel_Impl::TAO_FTEC_Event_Channel_Impl(
+ const TAO_EC_Event_Channel_Attributes& attributes)
+ : TAO_EC_Event_Channel_Base(attributes, new TAO_FTEC_Basic_Factory, false)
+{
+ this->scheduler_ =
+ CORBA::Object::_duplicate (attributes.scheduler);
+
+ this->create_strategies ();
+
+}
+
+TAO_FTEC_Event_Channel_Impl::~TAO_FTEC_Event_Channel_Impl()
+{
+}
+
+
+TAO_FTEC_Basic_Factory*
+TAO_FTEC_Event_Channel_Impl::factory()
+{
+ return static_cast<TAO_FTEC_Basic_Factory*> (TAO_EC_Event_Channel_Base::factory());
+}
+
+
+/// Start the internal threads (if any), etc.
+/// After this call the EC can be used.
+void
+TAO_FTEC_Event_Channel_Impl::activate_object (
+ CORBA::ORB_var orb,
+ const FtRtecEventComm::ObjectId& supplier_admin_oid,
+ const FtRtecEventComm::ObjectId& consumer_admin_oid
+ ACE_ENV_ARG_DECL)
+{
+
+ iogr_maker_.init(orb.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ TAO_EC_Event_Channel_Base::activate(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ supplier_admin()->activate(supplier_admin_oid ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ consumer_admin()->activate(consumer_admin_oid ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+
+
+// = The RtecEventChannelAdmin::EventChannel methods...
+/// The default implementation is:
+/// this->consumer_admin ()->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+RtecEventChannelAdmin::ConsumerAdmin_ptr
+TAO_FTEC_Event_Channel_Impl::for_consumers (ACE_ENV_SINGLE_ARG_DECL)
+ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ CORBA::Object_var obj = consumer_admin()->reference(ACE_ENV_SINGLE_ARG_PARAMETER);
+ obj = IOGR_Maker::instance()->forge_iogr(obj.in()
+ ACE_ENV_ARG_PARAMETER);
+ return RtecEventChannelAdmin::ConsumerAdmin::_narrow(obj.in() ACE_ENV_ARG_PARAMETER);
+}
+
+
+/// The default implementation is:
+/// this->supplier_admin ()->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+RtecEventChannelAdmin::SupplierAdmin_ptr
+TAO_FTEC_Event_Channel_Impl::for_suppliers (ACE_ENV_SINGLE_ARG_DECL)
+ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ CORBA::Object_var obj = supplier_admin()->reference(ACE_ENV_SINGLE_ARG_PARAMETER);
+ obj = IOGR_Maker::instance()->forge_iogr(obj.in()
+ ACE_ENV_ARG_PARAMETER);
+ return RtecEventChannelAdmin::SupplierAdmin::_narrow(obj.in() ACE_ENV_ARG_PARAMETER);
+}
+
+
+::FtRtecEventChannelAdmin::ObjectId *
+TAO_FTEC_Event_Channel_Impl::connect_push_consumer (
+ RtecEventComm::PushConsumer_ptr push_consumer,
+ const RtecEventChannelAdmin::ConsumerQOS & qos
+ ACE_ENV_ARG_DECL
+ )
+{
+ CORBA::Any_var any
+ = Request_Context_Repository().get_cached_result(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN(0);
+
+ FtRtecEventChannelAdmin::ObjectId *oid;
+
+ if (any.in() >>= oid) {
+ FtRtecEventChannelAdmin::ObjectId* result;
+ ACE_NEW_THROW_EX(result,
+ FtRtecEventChannelAdmin::ObjectId(*oid),
+ CORBA::NO_MEMORY());
+ return result;
+ }
+
+
+ ACE_NEW_THROW_EX(oid, FtRtecEventChannelAdmin::ObjectId, CORBA::NO_MEMORY());
+
+ FtRtecEventChannelAdmin::ObjectId_var object_id = oid;
+
+ Request_Context_Repository().generate_object_id(*oid ACE_ENV_ARG_PARAMETER);
+
+ obtain_push_supplier_and_connect(this,
+ object_id.in(),
+ push_consumer,
+ qos
+ ACE_ENV_ARG_PARAMETER);
+
+ ACE_CHECK_RETURN(0);
+
+ return object_id._retn();
+}
+
+
+::FtRtecEventChannelAdmin::ObjectId *
+TAO_FTEC_Event_Channel_Impl::connect_push_supplier (
+ RtecEventComm::PushSupplier_ptr push_supplier,
+ const RtecEventChannelAdmin::SupplierQOS & qos
+ ACE_ENV_ARG_DECL
+ )
+{
+ CORBA::Any_var any
+ = Request_Context_Repository().get_cached_result(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN(0);
+
+ FtRtecEventChannelAdmin::ObjectId *oid;
+
+ if (any.in() >>= oid) {
+ FtRtecEventChannelAdmin::ObjectId* result;
+ ACE_NEW_THROW_EX(result,
+ FtRtecEventChannelAdmin::ObjectId(*oid),
+ CORBA::NO_MEMORY());
+ return result;
+ }
+
+
+ ACE_NEW_THROW_EX(oid, FtRtecEventChannelAdmin::ObjectId, CORBA::NO_MEMORY());
+ FtRtecEventChannelAdmin::ObjectId_var object_id = oid;
+
+ Request_Context_Repository().generate_object_id(*oid ACE_ENV_ARG_PARAMETER);
+
+ obtain_push_consumer_and_connect(this,
+ object_id.in(),
+ push_supplier,
+ qos
+ ACE_ENV_ARG_PARAMETER);
+
+ ACE_CHECK_RETURN(0);
+
+ return object_id._retn();
+
+}
+
+void TAO_FTEC_Event_Channel_Impl::disconnect_push_supplier (
+ const FtRtecEventChannelAdmin::ObjectId & oid
+ ACE_ENV_ARG_DECL
+ )
+{
+ if (Request_Context_Repository().is_executed_request())
+ return;
+
+ TAO_FTEC_ProxyPushSupplier* proxy = this->find_proxy_push_supplier(oid);
+
+ if (proxy == NULL) // proxy not found
+ return;
+
+ ACE_CHECK;
+ proxy->disconnect_push_supplier(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void TAO_FTEC_Event_Channel_Impl::disconnect_push_consumer (
+ const FtRtecEventChannelAdmin::ObjectId & oid
+ ACE_ENV_ARG_DECL
+ )
+{
+ if (Request_Context_Repository().is_executed_request())
+ return;
+
+ TAO_FTEC_ProxyPushConsumer* proxy = this->find_proxy_push_consumer(oid);
+
+ if (proxy == NULL) // proxy not found
+ return;
+
+ ACE_CHECK;
+ proxy->disconnect_push_consumer(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void TAO_FTEC_Event_Channel_Impl::suspend_push_supplier (
+ const FtRtecEventChannelAdmin::ObjectId & oid
+ ACE_ENV_ARG_DECL
+ )
+{
+ if (Request_Context_Repository().is_executed_request())
+ return;
+
+ TAO_FTEC_ProxyPushSupplier* proxy = this->find_proxy_push_supplier(oid);
+
+ if (proxy == NULL) // proxy not found
+ ACE_THROW(FtRtecEventComm::InvalidObjectID());
+
+ ACE_CHECK;
+ proxy->suspend_connection(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+
+void TAO_FTEC_Event_Channel_Impl::resume_push_supplier (
+ const FtRtecEventChannelAdmin::ObjectId & oid
+ ACE_ENV_ARG_DECL
+ )
+{
+ if (Request_Context_Repository().is_executed_request())
+ return;
+
+ TAO_FTEC_ProxyPushSupplier* proxy = this->find_proxy_push_supplier(oid);
+
+ if (proxy == NULL) // proxy not found
+ ACE_THROW(FtRtecEventComm::InvalidObjectID());
+
+ ACE_CHECK;
+ proxy->resume_connection(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void TAO_FTEC_Event_Channel_Impl::push (
+ const FtRtecEventChannelAdmin::ObjectId & oid,
+ const RtecEventComm::EventSet & data
+ ACE_ENV_ARG_DECL
+ )
+{
+ TAO_FTEC_ProxyPushConsumer* proxy = this->find_proxy_push_consumer(oid);
+
+ if (proxy == NULL) // proxy not found
+ ACE_THROW(FtRtecEventComm::InvalidObjectID());
+
+ proxy->push(data ACE_ENV_ARG_PARAMETER);
+}
+
+
+
+
+void TAO_FTEC_Event_Channel_Impl::get_state (
+ FtRtecEventChannelAdmin::EventChannelState & state
+ ACE_ENV_ARG_DECL
+ )
+{
+ FtEventServiceInterceptor::instance()->get_state(state.cached_operation_results);
+ this->supplier_admin()->get_state(state.supplier_admin_state ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ this->consumer_admin()->get_state(state.consumer_admin_state ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+
+
+void TAO_FTEC_Event_Channel_Impl::set_state (const FTRT::State & stat
+ ACE_ENV_ARG_DECL)
+{
+ FtRtecEventChannelAdmin::EventChannelState state;
+
+ Safe_InputCDR cdr((const char*)stat.get_buffer(), stat.length());
+ cdr >> state;
+
+ FtEventServiceInterceptor::instance()->set_state(state.cached_operation_results);
+ this->supplier_admin()->set_state(state.supplier_admin_state ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ this->consumer_admin()->set_state(state.consumer_admin_state ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+
+void TAO_FTEC_Event_Channel_Impl::set_update (const FTRT::State & s
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException, FTRT::InvalidUpdate))
+{
+ FTRTEC::Replication_Service::instance()->check_validity(ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ if (!Request_Context_Repository().is_executed_request()) {
+ Safe_InputCDR cdr((const char*)s.get_buffer(), s.length());
+
+ FtRtecEventChannelAdmin::Operation_var op(new FtRtecEventChannelAdmin::Operation);
+ if (!(cdr >> *op)) {
+ ACE_THROW(FTRT::InvalidUpdate() );
+ }
+
+ (update_table[op->param._d()])(this, *op ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ }
+}
+
+
+TAO_FTEC_ConsumerAdmin* TAO_FTEC_Event_Channel_Impl::consumer_admin (void) const
+{
+ return static_cast<TAO_FTEC_ConsumerAdmin*> (TAO_EC_Event_Channel_Base::consumer_admin());
+}
+
+/// Access the supplier admin implementation, useful for controlling
+/// the activation...
+TAO_FTEC_SupplierAdmin* TAO_FTEC_Event_Channel_Impl::supplier_admin (void) const
+{
+ return static_cast<TAO_FTEC_SupplierAdmin*> (TAO_EC_Event_Channel_Base::supplier_admin());
+}
+
+
+TAO_FTEC_ProxyPushSupplier*
+TAO_FTEC_Event_Channel_Impl::find_proxy_push_supplier(const FtRtecEventChannelAdmin::ObjectId& id)
+{
+ ACE_TRY_NEW_ENV {
+ PortableServer::POA_var poa = consumer_poa();
+
+ const PortableServer::Servant servant = poa->id_to_servant(
+ reinterpret_cast<const PortableServer::ObjectId&> (id)
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ POA_RtecEventChannelAdmin::ProxyPushSupplier_ptr obj =
+ dynamic_cast<POA_RtecEventChannelAdmin::ProxyPushSupplier_ptr> (servant);
+
+ return static_cast<TAO_FTEC_ProxyPushSupplier*> (obj);
+ }
+ ACE_CATCHALL {
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+TAO_FTEC_ProxyPushConsumer*
+TAO_FTEC_Event_Channel_Impl::find_proxy_push_consumer(const FtRtecEventChannelAdmin::ObjectId& id)
+{
+ ACE_TRY_NEW_ENV {
+ PortableServer::POA_var poa= supplier_poa();
+
+ const PortableServer::Servant servant = poa->id_to_servant(
+ reinterpret_cast<const PortableServer::ObjectId&> (id)
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ POA_RtecEventChannelAdmin::ProxyPushConsumer_ptr obj =
+ dynamic_cast<POA_RtecEventChannelAdmin::ProxyPushConsumer_ptr> (servant);
+
+ return static_cast<TAO_FTEC_ProxyPushConsumer*> (obj);
+ }
+ ACE_CATCHALL {
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL