diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel.cpp | 427 |
1 files changed, 427 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel.cpp new file mode 100644 index 00000000000..f10dcae7493 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/FtRtEvent/EventChannel/FTEC_Event_Channel.cpp @@ -0,0 +1,427 @@ +// $Id$ + +#include "ace/Dynamic_Service.h" +#include "FTEC_Event_Channel.h" +#include "FTEC_Event_Channel_Impl.h" +#include "../Utils/activate_with_id.h" +#include "../Utils/resolve_init.h" +#include "../Utils/UUID.h" +#include "Fault_Detector_Loader.h" +#include "Fault_Detector.h" +#include "Request_Context_Repository.h" +#include "Replication_Service.h" +#include "Identification_Service.h" +#include "create_persistent_poa.h" +#include "tao/Utils/PolicyList_Destroyer.h" +#include "GroupInfoPublisher.h" + +ACE_RCSID (EventChannel, + TAO_FTEC_Event_Channel, + "$Id$") + +TAO_FTEC_Event_Channel::TAO_FTEC_Event_Channel(CORBA::ORB_var orb, + PortableServer::POA_var poa) +: orb_(orb) +, poa_(poa) +, ec_impl_(NULL) +{ +} + +TAO_FTEC_Event_Channel::~TAO_FTEC_Event_Channel() +{ + delete ec_impl_; +} + + +void setup_object_group(TAO_FTEC_Event_Channel* es, + CosNaming::NamingContext_ptr naming_context, + TAO_FTEC_Event_Channel::MEMBERSHIP membership, + FtRtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + if (membership != TAO_FTEC_Event_Channel::NONE) {// register to naming service + FTRT::ManagerInfoList member_list; + member_list.length(1); + member_list[0].the_location = Fault_Detector::instance()->my_location(); + member_list[0].ior = FTRT::ObjectGroupManager::_duplicate(ec); + + if (membership == TAO_FTEC_Event_Channel::PRIMARY) + es->create_group(member_list, 0 + ACE_ENV_ARG_PARAMETER); + + else { // BACKUP + FtRtecEventChannelAdmin::EventChannel_var primary = + resolve<FtRtecEventChannelAdmin::EventChannel>(naming_context, + FTRTEC::Identification_Service::instance()->name() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_DEBUG((LM_DEBUG, "Got Primary address from Naming Service\n")); + + primary->join_group(member_list[0] ACE_ENV_ARG_PARAMETER); + } + ACE_CHECK; + } +} + + +FtRtecEventChannelAdmin::EventChannel_ptr +TAO_FTEC_Event_Channel::activate(TAO_FTEC_Event_Channel::MEMBERSHIP membership + ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS) +{ + FTRTEC::Fault_Detector_Loader* detector_loader = + ACE_Dynamic_Service<FTRTEC::Fault_Detector_Loader>::instance("FTRTEC_Fault_Detector"); + + detector_loader->init(0, 0); + + if (FTRTEC::Identification_Service::instance() == 0) + ACE_ERROR_RETURN((LM_ERROR, "No Identification\n"), 0); + + // initialize naming_contex + CosNaming::NamingContext_var naming_context + = resolve_init<CosNaming::NamingContext>(orb_.in(), "NameService" + ACE_ENV_ARG_PARAMETER); + // initialize group info publisher + GroupInfoPublisher::instance()->set_naming_context(naming_context); + + if (FTRTEC::Replication_Service::instance()->init(0,0) == -1) + return 0; + + GroupInfoPublisher::instance()->subscribe(FTRTEC::Replication_Service::instance()); + + Request_Context_Repository().init(orb_.in()); + + // get POAManager + PortableServer::POAManager_var mgr = poa_->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN(0); + + TAO::Utils::PolicyList_Destroyer policy_list(2); + + persistent_poa_ = + create_persistent_poa(poa_, mgr, "FTEC_Persistant_POA", policy_list ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN(0); + + // Activate the Event channel implementation + + TAO_EC_Event_Channel_Attributes attr (persistent_poa_.in (), + persistent_poa_.in ()); + + + TAO_FTEC_Event_Channel_Impl* ec; + ACE_NEW_THROW_EX (ec, + TAO_FTEC_Event_Channel_Impl (attr), + CORBA::NO_MEMORY()); + + this->ec_impl_ = ec; + + const PortableServer::ObjectId& object_id + = FTRTEC::Identification_Service::instance()->object_id(); + + PortableServer::ObjectId consumer_admin_object_id(object_id); + consumer_admin_object_id[9]++; + + PortableServer::ObjectId supplier_admin_object_id(consumer_admin_object_id); + supplier_admin_object_id[9]++; + + ec->activate(orb_, + supplier_admin_object_id, + consumer_admin_object_id + ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN(0); + + FtRtecEventChannelAdmin::EventChannel_var result; + activate_object_with_id(result.out(), persistent_poa_, this, object_id + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN(0); + + + setup_object_group(this, + naming_context.in(), + membership, + result.in() + ACE_ENV_ARG_PARAMETER); + return result._retn(); +} + + +void TAO_FTEC_Event_Channel::set_listener(TAO_FTEC_Become_Primary_Listener* listener) +{ + GroupInfoPublisher::instance()->subscribe(listener); +} + +void TAO_FTEC_Event_Channel::set_update ( + const FTRT::State & s + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FTRT::InvalidUpdate + , FTRT::OutOfSequence + )) +{ + ec_impl_->set_update(s ACE_ENV_ARG_PARAMETER); +} + +void TAO_FTEC_Event_Channel::oneway_set_update ( + const FTRT::State & s + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ec_impl_->set_update(s ACE_ENV_ARG_PARAMETER); +} + + +RtecEventChannelAdmin::ConsumerAdmin_ptr +TAO_FTEC_Event_Channel::for_consumers (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + return ec_impl_->for_consumers(ACE_ENV_SINGLE_ARG_PARAMETER); +} + +RtecEventChannelAdmin::SupplierAdmin_ptr +TAO_FTEC_Event_Channel::for_suppliers (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + return ec_impl_->for_suppliers(ACE_ENV_SINGLE_ARG_PARAMETER); +} + + +void +TAO_FTEC_Event_Channel::set_state (const FTRT::State & s ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((CORBA::SystemException, FTRT::InvalidState)) +{ + ACE_DEBUG((LM_DEBUG, "TAO_FTEC_Event_Channel::set_state\n")); + ec_impl_->set_state(s ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::destroy (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (Fault_Detector::instance()) + Fault_Detector::instance()->stop(); + + ec_impl_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + this->orb_->shutdown (); +} + +RtecEventChannelAdmin::Observer_Handle +TAO_FTEC_Event_Channel::append_observer (RtecEventChannelAdmin::Observer_ptr observer + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR, + RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER)) +{ + /// we have yet to implement the replication of observers + /// throw an exception for the moment + ACE_THROW(RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER()); + ACE_CHECK; + + return this->ec_impl_->append_observer (observer ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::remove_observer (RtecEventChannelAdmin::Observer_Handle handle + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR, + RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER)) +{ + /// we have yet to implement the replication of observers + /// throw an exception for the moment + ACE_THROW(RtecEventChannelAdmin::EventChannel::CANT_REMOVE_OBSERVER()); + ACE_CHECK; + + ec_impl_->remove_observer (handle ACE_ENV_ARG_PARAMETER); +} + + + +CORBA::Boolean +TAO_FTEC_Event_Channel::start ( + FTRT::FaultListener_ptr listener, + FTRT::Location_out location + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return ec_impl_->start(listener, location); +} + +void +TAO_FTEC_Event_Channel::create_group ( + const FTRT::ManagerInfoList & info_list, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FTRT::PredecessorUnreachable + )) +{ + ec_impl_->create_group(info_list, object_group_ref_version ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::join_group ( + const FTRT::ManagerInfo & info + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ec_impl_->join_group(info ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::add_member ( + const FTRT::ManagerInfo & info, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ec_impl_->add_member(info, object_group_ref_version ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::remove_member ( + const FTRT::Location & crashed_location, + CORBA::ULong object_group_ref_version + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ec_impl_->remove_member(crashed_location, + object_group_ref_version + ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::replica_crashed ( + const FTRT::Location & location + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ec_impl_->replica_crashed(location ACE_ENV_ARG_PARAMETER); +} + + + /// EventChannelFacade Interface + +::FtRtecEventChannelAdmin::ObjectId * +TAO_FTEC_Event_Channel::connect_push_consumer ( + RtecEventComm::PushConsumer_ptr push_consumer, + const RtecEventChannelAdmin::ConsumerQOS & qos + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , RtecEventChannelAdmin::TypeError + )) +{ + return ec_impl_->connect_push_consumer(push_consumer, qos + ACE_ENV_ARG_PARAMETER); +} + + +::FtRtecEventChannelAdmin::ObjectId * +TAO_FTEC_Event_Channel::connect_push_supplier ( + RtecEventComm::PushSupplier_ptr push_supplier, + const RtecEventChannelAdmin::SupplierQOS & qos + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + return ec_impl_->connect_push_supplier(push_supplier, qos + ACE_ENV_ARG_PARAMETER); + +} + +void +TAO_FTEC_Event_Channel::disconnect_push_supplier ( + const FtRtecEventChannelAdmin::ObjectId & oid + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ec_impl_->disconnect_push_supplier(oid + ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::disconnect_push_consumer ( + const FtRtecEventChannelAdmin::ObjectId & oid + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + )) +{ + ec_impl_->disconnect_push_consumer(oid + ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::suspend_push_supplier ( + const FtRtecEventChannelAdmin::ObjectId & oid + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FtRtecEventComm::InvalidObjectID + )) +{ + ec_impl_->suspend_push_supplier(oid + ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::resume_push_supplier ( + const FtRtecEventChannelAdmin::ObjectId & oid + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FtRtecEventComm::InvalidObjectID + )) +{ + ec_impl_->resume_push_supplier(oid + ACE_ENV_ARG_PARAMETER); +} + +void +TAO_FTEC_Event_Channel::push ( + const FtRtecEventChannelAdmin::ObjectId & oid, + const RtecEventComm::EventSet & data + ACE_ENV_ARG_DECL_WITH_DEFAULTS + ) + ACE_THROW_SPEC (( + CORBA::SystemException + , FtRtecEventComm::InvalidObjectID + )) +{ + ec_impl_->push(oid, data + ACE_ENV_ARG_PARAMETER); +} + |