diff options
Diffstat (limited to 'ACE/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp')
-rw-r--r-- | ACE/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp | 544 |
1 files changed, 544 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp b/ACE/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp new file mode 100644 index 00000000000..462aa969035 --- /dev/null +++ b/ACE/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp @@ -0,0 +1,544 @@ +// $Id$ + +#include "orbsvcs/Notify/EventChannelFactory.h" + +ACE_RCSID(Notify, + TAO_Notify_EventChannelFactory, + "$Id$") + +#include "orbsvcs/Notify/Properties.h" +#include "orbsvcs/Notify/Factory.h" +#include "orbsvcs/Notify/Builder.h" +#include "orbsvcs/Notify/Topology_Saver.h" +#include "orbsvcs/Notify/Topology_Loader.h" +#include "orbsvcs/Notify/Save_Persist_Worker_T.h" +#include "orbsvcs/Notify/Reconnect_Worker_T.h" +#include "orbsvcs/Notify/Event_Persistence_Strategy.h" +#include "orbsvcs/Notify/Routing_Slip_Persistence_Manager.h" +#include "orbsvcs/Notify/EventChannel.h" +#include "orbsvcs/Notify/Container_T.h" +#include "orbsvcs/Notify/Find_Worker_T.h" +#include "orbsvcs/Notify/Seq_Worker_T.h" +#include "orbsvcs/Notify/POA_Helper.h" +#include "orbsvcs/Notify/Validate_Worker_T.h" +#include "orbsvcs/Notify/Validate_Client_Task.h" +#include "orbsvcs/Notify/FilterFactory.h" + +#include "ace/Dynamic_Service.h" + +#include "tao/debug.h" +//#define DEBUG_LEVEL 9 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +// Include this here since this is the only file that +// requires Topology_Factory. +namespace TAO_Notify +{ + // virtual + Topology_Factory::~Topology_Factory () + { + } +} + +typedef TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel + , CosNotifyChannelAdmin::EventChannel + , CosNotifyChannelAdmin::EventChannel_ptr + , CosNotifyChannelAdmin::ChannelNotFound> +TAO_Notify_EventChannel_Find_Worker; + +typedef TAO_Notify_Seq_Worker_T<TAO_Notify_EventChannel> TAO_Notify_EventChannel_Seq_Worker; + +TAO_Notify_EventChannelFactory::TAO_Notify_EventChannelFactory (void) + : topology_save_seq_ (0) + , topology_factory_(0) + , reconnect_registry_(*this) + , loading_topology_ (false) +{ +} + +TAO_Notify_EventChannelFactory::~TAO_Notify_EventChannelFactory () +{ +} + +void +TAO_Notify_EventChannelFactory::destroy (void) +{ + if (this->shutdown () == 1) + return; + + TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance(); + + // Reset references to CORBA objects. + properties->orb (CORBA::ORB::_nil ()); + properties->default_poa (PortableServer::POA::_nil ()); + + ec_container_.reset( 0 ); +} + +void +TAO_Notify_EventChannelFactory::init (PortableServer::POA_ptr poa) +{ + this->poa_ = PortableServer::POA::_duplicate (poa); + + ACE_ASSERT (this->ec_container_.get() == 0); + + // Init ec_container_ + TAO_Notify_EventChannel_Container* ecc = 0; + ACE_NEW_THROW_EX (ecc, + TAO_Notify_EventChannel_Container (), + CORBA::INTERNAL ()); + this->ec_container_.reset( ecc ); + + this->ec_container().init (); + + TAO_Notify_POA_Helper* object_poa = 0; + + // Bootstrap initial Object POA + ACE_NEW_THROW_EX (object_poa, + TAO_Notify_POA_Helper (), + CORBA::NO_MEMORY ()); + + ACE_Auto_Ptr<TAO_Notify_POA_Helper> auto_object_poa (object_poa); + + ACE_CString poa_name = object_poa->get_unique_id (); +#if defined (CORBA_E_MICRO) + object_poa->init (poa, poa_name.c_str ()); +#else + object_poa->init_persistent (poa, poa_name.c_str ()); +#endif /* CORBA_E_MICRO */ + + this->adopt_poa (auto_object_poa.release ()); + + // Note topology factory is configured separately from the "builder" mediated + // objects since it is independant of the "style" of Notification Service. + this->topology_factory_ = + ACE_Dynamic_Service <TAO_Notify::Topology_Factory>::instance ("Topology_Factory"); + + this->load_topology (); + + this->load_event_persistence (); + + if (TAO_Notify_PROPERTIES::instance()->validate_client() == true) + { + TAO_Notify_validate_client_Task* validate_client_task = 0; + ACE_NEW_THROW_EX (validate_client_task, + TAO_Notify_validate_client_Task (TAO_Notify_PROPERTIES::instance()->validate_client_delay (), + TAO_Notify_PROPERTIES::instance()->validate_client_interval (), + this), + CORBA::INTERNAL ()); + this->validate_client_task_.reset (validate_client_task); + } +} + +void +TAO_Notify_EventChannelFactory::_add_ref (void) +{ + this->_incr_refcnt (); +} + +void +TAO_Notify_EventChannelFactory::_remove_ref (void) +{ + this->_decr_refcnt (); +} + +void +TAO_Notify_EventChannelFactory::release (void) +{ + delete this; + //@@ inform factory +} + +void +TAO_Notify_EventChannelFactory::remove (TAO_Notify_EventChannel* event_channel) +{ + this->ec_container().remove (event_channel); + this->self_change (); +} + +int +TAO_Notify_EventChannelFactory::shutdown (void) +{ + this->stop_validator(); + + if (TAO_Notify_Object::shutdown () == 1) + return 1; + + this->ec_container().shutdown (); + + return 0; +} + + +CosNotifyChannelAdmin::EventChannel_ptr +TAO_Notify_EventChannelFactory::create_named_channel ( + const CosNotification::QoSProperties& initial_qos, + const CosNotification::AdminProperties& initial_admin, + CosNotifyChannelAdmin::ChannelID_out id, + const char*) +{ + return this->create_channel (initial_qos, initial_admin, id); +} + + +::CosNotifyChannelAdmin::EventChannel_ptr TAO_Notify_EventChannelFactory::create_channel ( + const CosNotification::QoSProperties & initial_qos, + const CosNotification::AdminProperties & initial_admin, + CosNotifyChannelAdmin::ChannelID_out id + ) +{ + CosNotifyChannelAdmin::EventChannel_var ec = + TAO_Notify_PROPERTIES::instance()->builder()->build_event_channel (this + , initial_qos + , initial_admin + , id); + this->self_change (); + return ec._retn (); +} + +CosNotifyChannelAdmin::ChannelIDSeq* +TAO_Notify_EventChannelFactory::get_all_channels (void) +{ + TAO_Notify_EventChannel_Seq_Worker seq_worker; + + return seq_worker.create (this->ec_container()); +} + +CosNotifyChannelAdmin::EventChannel_ptr +TAO_Notify_EventChannelFactory::get_event_channel (CosNotifyChannelAdmin::ChannelID id) +{ + TAO_Notify_EventChannel_Find_Worker find_worker; + + return find_worker.resolve (id, this->ec_container()); +} + +void +TAO_Notify_EventChannelFactory::set_topology_factory(TAO_Notify::Topology_Factory* f) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P,%t) Debug Topology_Factory installed in EventChannelFactory.\n") + )); + // If the above meessage appears when you don't expect it + // use svc.conf to install the topology factory rather + // than calling this method. + this->topology_factory_ = f; +} + +void +TAO_Notify_EventChannelFactory::load_topology (void) +{ + this->loading_topology_ = true; + if (this->topology_factory_ != 0) + { + // create_loader will open and load the persistence file for validation + auto_ptr<TAO_Notify::Topology_Loader> tl(this->topology_factory_->create_loader()); + if (tl.get () != 0) + { + tl->load (this); + } + } + else + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Topology persistence disabled.\n"))); + } + this->loading_topology_ = false; +} + +void +TAO_Notify_EventChannelFactory::validate () +{ + TAO_Notify::Validate_Worker<TAO_Notify_EventChannel> wrk; + + this->ec_container().collection()->for_each(&wrk); +} + +void +TAO_Notify_EventChannelFactory::stop_validator () +{ + if (this->validate_client_task_.get () != 0) + { + this->validate_client_task_->shutdown (); + } +} + +bool +TAO_Notify_EventChannelFactory::is_persistent () const +{ + return true; +} + +void +TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& saver) +{ + bool changed = this->self_changed_; + this->self_changed_ = false; + this->children_changed_ = false; + + TAO_Notify::NVPList attrs; // ECF has no attributes + + bool want_all_children = + saver.begin_object(0, "channel_factory", attrs, changed); + + // for each deleted child + // delete_child // if the child has persistence. + + TAO_Notify::Save_Persist_Worker<TAO_Notify_EventChannel> wrk(saver, want_all_children); + + this->ec_container().collection()->for_each(&wrk); + + if (want_all_children || this->reconnect_registry_.is_changed ()) + { + this->reconnect_registry_.save_persistent(saver); + } + saver.end_object(0, "channel_factory"); +} + +void +TAO_Notify_EventChannelFactory::load_event_persistence (void) +{ + TAO_Notify::Event_Persistence_Strategy * strategy = + ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence"); + if (strategy != 0) + { + if (this->topology_factory_ != 0) + { + TAO_Notify::Event_Persistence_Factory * factory = strategy->get_factory (); + if (factory != 0) + { + for ( + TAO_Notify::Routing_Slip_Persistence_Manager * rspm = factory->first_reload_manager(); + rspm != 0; + rspm = rspm->load_next ()) + { + TAO_Notify::Routing_Slip_Ptr routing_slip = TAO_Notify::Routing_Slip::create (*this, rspm); + if (!routing_slip.null ()) + { + this->routing_slip_restart_set_.insert (routing_slip); + } + else + { + //@@todo: tell the rspm it's an orphan, but we can't during reload + // we need collect these and come back later to remove them + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Reload persistent event failed.\n") + )); + } + } + } + } + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Notify Service: Configuration error. Event Persistence requires Topology Persistence.\n") + )); + throw CORBA::PERSIST_STORE(); + } + } +} + +bool +TAO_Notify_EventChannelFactory::change_to_parent (void) +{ + bool saving = false; + if (! this->loading_topology_) + { + // A null pointer means that saving of topology is disabled. + if (this->topology_factory_ != 0) + { + saving = true; + // seq is used to check save-in-progress + // if it changes while we're waiting for the lock + // then our change may have already been saved, so + // just return. Caller will signal change again if necessary. + short seq = this->topology_save_seq_; + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->topology_save_lock_, CORBA::INTERNAL ()); + if (seq == this->topology_save_seq_) + { + auto_ptr<TAO_Notify::Topology_Saver> saver(this->topology_factory_->create_saver()); + if (saver.get() != 0) + { + this->save_persistent(*saver); + saver->close (); + } + this->topology_save_seq_ += 1; + } + } + } + return saving; +} + +TAO_Notify::Topology_Object* +TAO_Notify_EventChannelFactory::load_child (const ACE_CString& type, + CORBA::Long id, + const TAO_Notify:: + NVPList& attrs) +{ + // ignore anything but our valid children (ie channel) + TAO_Notify::Topology_Object * result = this; + if (type == "channel") + { + if (DEBUG_LEVEL) ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) EventChannelFactory reload channel %d\n") + , static_cast<int> (id) + )); + + TAO_Notify_Builder* bld = TAO_Notify_PROPERTIES::instance()->builder(); + TAO_Notify_EventChannel * ec = bld->build_event_channel( + this , + id); + + ec->load_attrs (attrs); + + result = ec; + } + else if (type == TAO_Notify::REGISTRY_TYPE) + { + result = & this->reconnect_registry_; + } + return result; +} + +void +TAO_Notify_EventChannelFactory::reconnect (void) +{ + // Reconnect all children first + TAO_Notify::Reconnect_Worker<TAO_Notify_EventChannel> wrk; + + this->ec_container().collection()->for_each(&wrk); + + // Then send reconnection announcement to registered clients + ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ())); + this->reconnect_registry_.send_reconnect (this->channel_factory_.in ()); + + // reactivate events in-progress + Routing_Slip_Set::CONST_ITERATOR iter (this->routing_slip_restart_set_); + TAO_Notify::Routing_Slip_Ptr * routing_slip; + for (iter.first(); iter.next(routing_slip); iter.advance()) + { + (*routing_slip)->reconnect(); + } + this->routing_slip_restart_set_.reset (); +} + +NotifyExt::ReconnectionRegistry::ReconnectionID +TAO_Notify_EventChannelFactory::register_callback ( + NotifyExt::ReconnectionCallback_ptr reconnection) +{ + return this->reconnect_registry_.register_callback ( + reconnection); +} + +void +TAO_Notify_EventChannelFactory::unregister_callback ( + NotifyExt::ReconnectionRegistry::ReconnectionID id) +{ + this->reconnect_registry_.unregister_callback ( + id); +} + +CORBA::Boolean +TAO_Notify_EventChannelFactory::is_alive (void) +{ + return CORBA::Boolean (1); +} + +void +TAO_Notify_EventChannelFactory::save_topology (void) +{ + this->self_change (); +} + +TAO_Notify_ProxyConsumer * +TAO_Notify_EventChannelFactory::find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position) +{ + TAO_Notify_ProxyConsumer * result = 0; + size_t path_size = id_path.size (); + + // EventChannelFactory only: The first id is proably for the ECF itself + // if so, silently consume it. + if (position < path_size && id_path[position] == this->id()) + { + ++position; + } + if (position < path_size) + { + TAO_Notify_EventChannel_Find_Worker find_worker; + + TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container()); + ++position; + if (ec != 0) + { + result = ec->find_proxy_consumer (id_path, position); + } + } + return result; +} + +TAO_Notify_ProxySupplier * +TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position) +{ + TAO_Notify_ProxySupplier * result = 0; + size_t path_size = id_path.size (); + + // EventChannelFactory only: The first id is proably for the ECF itself + // if so, silently consume it. + if (position < path_size && id_path[position] == this->id()) + { + ++position; + } + if (position < path_size) + { + TAO_Notify_EventChannel_Find_Worker find_worker; + TAO_Notify_EventChannel * ec = find_worker.find (id_path[position], this->ec_container()); + ++position; + if (ec != 0) + { + result = ec->find_proxy_supplier (id_path, position); + } + } + return result; +} + +CosNotifyChannelAdmin::EventChannelFactory_ptr +TAO_Notify_EventChannelFactory::activate_self (void) +{ + CORBA::Object_var obj = this->activate (this); + this->channel_factory_ + = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in()); + + try + { + if (DEBUG_LEVEL > 9) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self\n") )); + } + this->reconnect (); + } + catch (const CORBA::Exception&) + { + // ignore for now + } + return this->channel_factory_._retn(); +} + + +TAO_Notify_Object::ID +TAO_Notify_EventChannelFactory::get_id () const +{ + return id(); +} + +TAO_Notify_EventChannelFactory::TAO_Notify_EventChannel_Container& +TAO_Notify_EventChannelFactory::ec_container() +{ + ACE_ASSERT( this->ec_container_.get() != 0 ); + return *ec_container_; +} + + +TAO_END_VERSIONED_NAMESPACE_DECL |