summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp495
1 files changed, 495 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp
new file mode 100644
index 00000000000..0a871bebfdf
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.cpp
@@ -0,0 +1,495 @@
+// $Id$
+
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel_Impl.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_Factory.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_SupplierAdmin.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_ConsumerAdmin.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_ProxyConsumer.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FTEC_ProxySupplier.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FtEventServiceInterceptor.h"
+#include "orbsvcs/FtRtEvent/EventChannel/FT_ProxyAdmin_Base.h"
+#include "orbsvcs/FtRtEvent/EventChannel/IOGR_Maker.h"
+#include "orbsvcs/FtRtEvent/EventChannel/Replication_Service.h"
+#include "../Utils/Safe_InputCDR.h"
+#include "orbsvcs/FtRtecEventCommC.h"
+
+
+ACE_RCSID (EventChannel,
+ FTEC_Event_Channel_Impl,
+ "$Id$")
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+void obtain_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op)
+{
+ ec->consumer_admin()->obtain_proxy(op);
+}
+
+void obtain_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op)
+{
+ ec->supplier_admin()->obtain_proxy(op);
+}
+
+void obtain_push_consumer_and_connect(TAO_FTEC_Event_Channel_Impl* ec,
+ const FtRtecEventChannelAdmin::ObjectId& oid,
+ RtecEventComm::PushSupplier_ptr push_supplier,
+ const RtecEventChannelAdmin::SupplierQOS & qos)
+{
+ Request_Context_Repository().set_object_id(oid);
+
+ RtecEventChannelAdmin::ProxyPushConsumer_var consumer =
+ ec->supplier_admin()->obtain();
+
+
+ try{
+ consumer->connect_push_supplier(push_supplier, qos);
+ }
+ catch (...){
+ ec->supplier_admin()->disconnect(consumer.in());
+ throw;
+ }
+}
+
+
+void connect_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op)
+{
+ PortableServer::POA_var poa= ec->supplier_poa();
+ FtRtecEventChannelAdmin::Connect_push_supplier_param& param
+ = op.param.connect_supplier_param();
+
+ TAO_FTEC_ProxyPushConsumer* proxy
+ = ec->find_proxy_push_consumer(op.object_id);
+
+ if (proxy == NULL) {
+ obtain_push_consumer_and_connect(ec,
+ op.object_id,
+ param.push_supplier.in(),
+ param.qos);
+ }
+ else {
+ proxy->connect_push_supplier(param.push_supplier.in(),
+ param.qos);
+ }
+}
+
+void obtain_push_supplier_and_connect(TAO_FTEC_Event_Channel_Impl* ec,
+ const FtRtecEventChannelAdmin::ObjectId& oid,
+ RtecEventComm::PushConsumer_ptr push_consumer,
+ const RtecEventChannelAdmin::ConsumerQOS & qos)
+{
+ Request_Context_Repository().set_object_id(oid);
+
+ RtecEventChannelAdmin::ProxyPushSupplier_var supplier =
+ ec->consumer_admin()->obtain();
+
+
+ try{
+ supplier->connect_push_consumer(push_consumer, qos);
+ }
+ catch (...){
+ ec->consumer_admin()->disconnect(supplier.in());
+ throw;
+ }
+}
+
+
+void connect_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op)
+{
+ PortableServer::POA_var poa= ec->consumer_poa();
+ FtRtecEventChannelAdmin::Connect_push_consumer_param& param
+ = op.param.connect_consumer_param();
+
+ TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
+
+ if (proxy == NULL){
+ obtain_push_supplier_and_connect(ec,
+ op.object_id,
+ param.push_consumer.in(),
+ param.qos);
+ }
+ else {
+ proxy->connect_push_consumer(param.push_consumer.in(),
+ param.qos);
+ }
+}
+
+void disconnect_push_supplier(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op)
+{
+ PortableServer::POA_var poa= ec->consumer_poa();
+
+ TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
+
+
+ if (proxy == NULL) // proxy not found
+ throw FTRT::InvalidUpdate();
+
+ proxy->disconnect_push_supplier();
+}
+
+void disconnect_push_consumer(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op)
+{
+ PortableServer::POA_var poa= ec->supplier_poa();
+
+ TAO_FTEC_ProxyPushConsumer* proxy = ec->find_proxy_push_consumer(op.object_id);
+
+ if (proxy == NULL) // proxy not found
+ throw FTRT::InvalidUpdate();
+
+ proxy->disconnect_push_consumer();
+}
+
+
+void suspend_connection (TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op)
+{
+ PortableServer::POA_var poa= ec->consumer_poa();
+
+ TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
+
+
+ if (proxy == NULL) // proxy not found
+ throw FTRT::InvalidUpdate();
+
+ proxy->suspend_connection();
+}
+
+void resume_connection(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op)
+{
+ TAO_FTEC_ProxyPushSupplier* proxy = ec->find_proxy_push_supplier(op.object_id);
+
+
+ if (proxy == NULL) // proxy not found
+ throw FTRT::InvalidUpdate();
+
+ proxy->resume_connection();
+}
+
+
+typedef void (*Set_update_fun)(TAO_FTEC_Event_Channel_Impl* ec,
+ FtRtecEventChannelAdmin::Operation& op);
+
+Set_update_fun const update_table[] = {
+ &obtain_push_supplier,
+ &obtain_push_consumer,
+ &disconnect_push_supplier,
+ &disconnect_push_consumer,
+ &suspend_connection,
+ &resume_connection,
+ &connect_push_supplier,
+ &connect_push_consumer
+};
+
+TAO_FTEC_Event_Channel_Impl::TAO_FTEC_Event_Channel_Impl(
+ const TAO_EC_Event_Channel_Attributes& attributes)
+ : TAO_EC_Event_Channel_Base(attributes, new TAO_FTEC_Basic_Factory, false)
+{
+ this->scheduler_ =
+ CORBA::Object::_duplicate (attributes.scheduler);
+
+ this->create_strategies ();
+
+}
+
+TAO_FTEC_Event_Channel_Impl::~TAO_FTEC_Event_Channel_Impl()
+{
+}
+
+
+TAO_FTEC_Basic_Factory*
+TAO_FTEC_Event_Channel_Impl::factory()
+{
+ return static_cast<TAO_FTEC_Basic_Factory*> (TAO_EC_Event_Channel_Base::factory());
+}
+
+
+/// Start the internal threads (if any), etc.
+/// After this call the EC can be used.
+void
+TAO_FTEC_Event_Channel_Impl::activate_object (
+ CORBA::ORB_var orb,
+ const FtRtecEventComm::ObjectId& supplier_admin_oid,
+ const FtRtecEventComm::ObjectId& consumer_admin_oid)
+{
+
+ iogr_maker_.init(orb.in());
+
+ TAO_EC_Event_Channel_Base::activate();
+
+ supplier_admin()->activate(supplier_admin_oid);
+
+ consumer_admin()->activate(consumer_admin_oid);
+}
+
+
+
+// = The RtecEventChannelAdmin::EventChannel methods...
+/// The default implementation is:
+/// this->consumer_admin ()->_this ();
+RtecEventChannelAdmin::ConsumerAdmin_ptr
+TAO_FTEC_Event_Channel_Impl::for_consumers (void)
+{
+ CORBA::Object_var obj = consumer_admin()->reference();
+ obj = IOGR_Maker::instance()->forge_iogr(obj.in());
+ return RtecEventChannelAdmin::ConsumerAdmin::_narrow(obj.in());
+}
+
+
+/// The default implementation is:
+/// this->supplier_admin ()->_this ();
+RtecEventChannelAdmin::SupplierAdmin_ptr
+TAO_FTEC_Event_Channel_Impl::for_suppliers (void)
+{
+ CORBA::Object_var obj = supplier_admin()->reference();
+ obj = IOGR_Maker::instance()->forge_iogr(obj.in());
+ return RtecEventChannelAdmin::SupplierAdmin::_narrow(obj.in());
+}
+
+
+::FtRtecEventChannelAdmin::ObjectId *
+TAO_FTEC_Event_Channel_Impl::connect_push_consumer (
+ RtecEventComm::PushConsumer_ptr push_consumer,
+ const RtecEventChannelAdmin::ConsumerQOS & qos
+ )
+{
+ CORBA::Any_var any
+ = Request_Context_Repository().get_cached_result();
+
+ FtRtecEventChannelAdmin::ObjectId *oid;
+
+ if (any.in() >>= oid) {
+ FtRtecEventChannelAdmin::ObjectId* result;
+ ACE_NEW_THROW_EX(result,
+ FtRtecEventChannelAdmin::ObjectId(*oid),
+ CORBA::NO_MEMORY());
+ return result;
+ }
+
+
+ ACE_NEW_THROW_EX(oid, FtRtecEventChannelAdmin::ObjectId, CORBA::NO_MEMORY());
+
+ FtRtecEventChannelAdmin::ObjectId_var object_id = oid;
+
+ Request_Context_Repository().generate_object_id(*oid);
+
+ obtain_push_supplier_and_connect(this,
+ object_id.in(),
+ push_consumer,
+ qos);
+
+
+ return object_id._retn();
+}
+
+
+::FtRtecEventChannelAdmin::ObjectId *
+TAO_FTEC_Event_Channel_Impl::connect_push_supplier (
+ RtecEventComm::PushSupplier_ptr push_supplier,
+ const RtecEventChannelAdmin::SupplierQOS & qos
+ )
+{
+ CORBA::Any_var any
+ = Request_Context_Repository().get_cached_result();
+
+ FtRtecEventChannelAdmin::ObjectId *oid;
+
+ if (any.in() >>= oid) {
+ FtRtecEventChannelAdmin::ObjectId* result;
+ ACE_NEW_THROW_EX(result,
+ FtRtecEventChannelAdmin::ObjectId(*oid),
+ CORBA::NO_MEMORY());
+ return result;
+ }
+
+
+ ACE_NEW_THROW_EX(oid, FtRtecEventChannelAdmin::ObjectId, CORBA::NO_MEMORY());
+ FtRtecEventChannelAdmin::ObjectId_var object_id = oid;
+
+ Request_Context_Repository().generate_object_id(*oid);
+
+ obtain_push_consumer_and_connect(this,
+ object_id.in(),
+ push_supplier,
+ qos);
+
+
+ return object_id._retn();
+
+}
+
+void TAO_FTEC_Event_Channel_Impl::disconnect_push_supplier (
+ const FtRtecEventChannelAdmin::ObjectId & oid
+ )
+{
+ if (Request_Context_Repository().is_executed_request())
+ return;
+
+ TAO_FTEC_ProxyPushSupplier* proxy = this->find_proxy_push_supplier(oid);
+
+ if (proxy == NULL) // proxy not found
+ return;
+
+ proxy->disconnect_push_supplier();
+}
+
+void TAO_FTEC_Event_Channel_Impl::disconnect_push_consumer (
+ const FtRtecEventChannelAdmin::ObjectId & oid
+ )
+{
+ if (Request_Context_Repository().is_executed_request())
+ return;
+
+ TAO_FTEC_ProxyPushConsumer* proxy = this->find_proxy_push_consumer(oid);
+
+ if (proxy == NULL) // proxy not found
+ return;
+
+ proxy->disconnect_push_consumer();
+}
+
+void TAO_FTEC_Event_Channel_Impl::suspend_push_supplier (
+ const FtRtecEventChannelAdmin::ObjectId & oid
+ )
+{
+ if (Request_Context_Repository().is_executed_request())
+ return;
+
+ TAO_FTEC_ProxyPushSupplier* proxy = this->find_proxy_push_supplier(oid);
+
+ if (proxy == NULL) // proxy not found
+ throw FtRtecEventComm::InvalidObjectID();
+
+ proxy->suspend_connection();
+}
+
+
+void TAO_FTEC_Event_Channel_Impl::resume_push_supplier (
+ const FtRtecEventChannelAdmin::ObjectId & oid
+ )
+{
+ if (Request_Context_Repository().is_executed_request())
+ return;
+
+ TAO_FTEC_ProxyPushSupplier* proxy = this->find_proxy_push_supplier(oid);
+
+ if (proxy == NULL) // proxy not found
+ throw FtRtecEventComm::InvalidObjectID();
+
+ proxy->resume_connection();
+}
+
+void TAO_FTEC_Event_Channel_Impl::push (
+ const FtRtecEventChannelAdmin::ObjectId & oid,
+ const RtecEventComm::EventSet & data
+ )
+{
+ TAO_FTEC_ProxyPushConsumer* proxy = this->find_proxy_push_consumer(oid);
+
+ if (proxy == NULL) // proxy not found
+ throw FtRtecEventComm::InvalidObjectID();
+
+ proxy->push(data);
+}
+
+
+
+
+void TAO_FTEC_Event_Channel_Impl::get_state (
+ FtRtecEventChannelAdmin::EventChannelState & state
+ )
+{
+ FtEventServiceInterceptor::instance()->get_state(state.cached_operation_results);
+ this->supplier_admin()->get_state(state.supplier_admin_state);
+ this->consumer_admin()->get_state(state.consumer_admin_state);
+}
+
+
+
+void TAO_FTEC_Event_Channel_Impl::set_state (const FTRT::State & stat)
+{
+ FtRtecEventChannelAdmin::EventChannelState state;
+
+ Safe_InputCDR cdr((const char*)stat.get_buffer(), stat.length());
+ cdr >> state;
+
+ FtEventServiceInterceptor::instance()->set_state(state.cached_operation_results);
+ this->supplier_admin()->set_state(state.supplier_admin_state);
+ this->consumer_admin()->set_state(state.consumer_admin_state);
+}
+
+
+void TAO_FTEC_Event_Channel_Impl::set_update (const FTRT::State & s)
+{
+ FTRTEC::Replication_Service::instance()->check_validity();
+
+ if (!Request_Context_Repository().is_executed_request()) {
+ Safe_InputCDR cdr((const char*)s.get_buffer(), s.length());
+
+ FtRtecEventChannelAdmin::Operation_var op(new FtRtecEventChannelAdmin::Operation);
+ if (!(cdr >> *op)) {
+ throw FTRT::InvalidUpdate();
+ }
+
+ (update_table[op->param._d()])(this, *op);
+
+ }
+}
+
+
+TAO_FTEC_ConsumerAdmin* TAO_FTEC_Event_Channel_Impl::consumer_admin (void) const
+{
+ return static_cast<TAO_FTEC_ConsumerAdmin*> (TAO_EC_Event_Channel_Base::consumer_admin());
+}
+
+/// Access the supplier admin implementation, useful for controlling
+/// the activation...
+TAO_FTEC_SupplierAdmin* TAO_FTEC_Event_Channel_Impl::supplier_admin (void) const
+{
+ return static_cast<TAO_FTEC_SupplierAdmin*> (TAO_EC_Event_Channel_Base::supplier_admin());
+}
+
+
+TAO_FTEC_ProxyPushSupplier*
+TAO_FTEC_Event_Channel_Impl::find_proxy_push_supplier(const FtRtecEventChannelAdmin::ObjectId& id)
+{
+ try{
+ PortableServer::POA_var poa = consumer_poa();
+
+ const PortableServer::Servant servant = poa->id_to_servant(
+ reinterpret_cast<const PortableServer::ObjectId&> (id));
+ POA_RtecEventChannelAdmin::ProxyPushSupplier_ptr obj =
+ dynamic_cast<POA_RtecEventChannelAdmin::ProxyPushSupplier_ptr> (servant);
+
+ return static_cast<TAO_FTEC_ProxyPushSupplier*> (obj);
+ }
+ catch (...){
+ }
+ return 0;
+}
+
+TAO_FTEC_ProxyPushConsumer*
+TAO_FTEC_Event_Channel_Impl::find_proxy_push_consumer(const FtRtecEventChannelAdmin::ObjectId& id)
+{
+ try{
+ PortableServer::POA_var poa= supplier_poa();
+
+ const PortableServer::Servant servant = poa->id_to_servant(
+ reinterpret_cast<const PortableServer::ObjectId&> (id));
+
+ POA_RtecEventChannelAdmin::ProxyPushConsumer_ptr obj =
+ dynamic_cast<POA_RtecEventChannelAdmin::ProxyPushConsumer_ptr> (servant);
+
+ return static_cast<TAO_FTEC_ProxyPushConsumer*> (obj);
+ }
+ catch (...){
+ }
+ return 0;
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL