diff options
Diffstat (limited to 'ACE/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp')
-rw-r--r-- | ACE/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp | 1475 |
1 files changed, 1475 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp b/ACE/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp new file mode 100644 index 00000000000..c2c6bf79371 --- /dev/null +++ b/ACE/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp @@ -0,0 +1,1475 @@ +// $Id$ + +#include "orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h" + +#include "ace/Monitor_Point_Registry.h" + +#include "orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h" +#include "orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h" +#include "orbsvcs/Notify/MonitorControl/Dynamic_Statistic.h" +#include "orbsvcs/Notify/MonitorControl/Timestamp_Monitor.h" +#include "orbsvcs/Notify/MonitorControl/Control_Registry.h" +#include "orbsvcs/Notify/Buffering_Strategy.h" +#include "orbsvcs/Notify/ProxySupplier.h" +#include "orbsvcs/Notify/ProxyConsumer.h" +#include "orbsvcs/Notify/ThreadPool_Task.h" + +#if defined (TAO_HAS_MONITOR_FRAMEWORK) && (TAO_HAS_MONITOR_FRAMEWORK == 1) + +using namespace ACE_VERSIONED_NAMESPACE_NAME::ACE::Monitor_Control; + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +// ****************************************************************** +// Dynamic Statistic Classes +// ****************************************************************** + +class EventChannelConsumersSuppliers + : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> +{ +public: + EventChannelConsumersSuppliers (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + Monitor_Control_Types::Information_Type type, + bool is_supplier = false) + : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec, + name.c_str (), + type), + is_supplier_ (is_supplier) + { + } + + virtual void update (void) { + if (this->type () == Monitor_Control_Types::MC_LIST) + { + Monitor_Control_Types::NameList list; + if (this->is_supplier_) + this->interf_->get_suppliers (&list); + else + this->interf_->get_consumers (&list); + this->receive (list); + } + else + { + if (this->is_supplier_) + this->receive (this->interf_->get_suppliers (0)); + else + this->receive (this->interf_->get_consumers (0)); + } + } + +private: + bool is_supplier_; +}; + +class EventChannelTimedoutConsumers + : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> +{ +public: + EventChannelTimedoutConsumers (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + Monitor_Control_Types::Information_Type type) + : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec, + name.c_str (), + type) + { + } + + virtual void update (void) { + Monitor_Control_Types::NameList list; + this->interf_->get_timedout_consumers (&list); + this->receive (list); + } +}; + +class EventChannelConsumerSupplierAdmins + : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> +{ +public: + EventChannelConsumerSupplierAdmins (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + Monitor_Control_Types::Information_Type type, + bool is_supplier = false) + : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec, + name.c_str (), + type), + is_supplier_ (is_supplier) + { + } + + virtual void update (void) + { + if (this->type () == Monitor_Control_Types::MC_LIST) + { + Monitor_Control_Types::NameList list; + + if (this->is_supplier_) + { + this->interf_->get_supplieradmins (&list); + } + else + { + this->interf_->get_consumeradmins (&list); + } + + this->receive (list); + } + else + { + if (this->is_supplier_) + { + this->receive (this->interf_->get_supplieradmins (0)); + } + else + { + this->receive (this->interf_->get_consumeradmins (0)); + } + } + } + +private: + bool is_supplier_; +}; + +class QueuedEvents + : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> +{ +public: + QueuedEvents (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + Monitor_Control_Types::Information_Type type, + bool count) + : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec, + name.c_str (), + type), + count_ (count) + { + } + + virtual void update (void) + { + this->receive (this->interf_->calculate_queue_size (this->count_)); + } + +private: + bool count_; +}; + +class OldestEvent + : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> +{ +public: + OldestEvent (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + Monitor_Control_Types::Information_Type type) + : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec, + name.c_str (), + type) + { + } + + virtual void update (void) + { + this->receive (this->interf_->get_oldest_event ()); + } +}; + +class SlowestConsumers + : public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> +{ +public: + SlowestConsumers (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + Monitor_Control_Types::Information_Type type) + : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec, + name.c_str (), + type) + { + } + + virtual void update (void) + { + Monitor_Control_Types::NameList list; + this->interf_->determine_slowest_consumer (&list); + this->receive (list); + } +}; + +class QueueOverflows: + public Monitor_Base +{ +public: + QueueOverflows (const ACE_CString& name) + : Monitor_Base ( + name.c_str (), + Monitor_Control_Types::MC_COUNTER) { + } +}; + +class ShutdownControl : public TAO_NS_Control +{ +public: + ShutdownControl (TAO_MonitorEventChannel* ec, + const ACE_CString& name) + : TAO_NS_Control (name.c_str ()), + ec_ (ec) + { + } + + virtual bool execute (const char* command) + { + if (ACE_OS::strcmp (command, TAO_NS_CONTROL_SHUTDOWN) == 0) + { + this->ec_->destroy (); + } + else + { + return false; + } + + return true; + } + +private: + TAO_MonitorEventChannel* ec_; +}; + +class RemoveConsumerSupplierControl : public TAO_NS_Control +{ +public: + RemoveConsumerSupplierControl (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + CosNotifyChannelAdmin::ProxyID id, + bool is_supplier) + : TAO_NS_Control (name.c_str ()), + ec_ (ec), + id_ (id), + is_supplier_ (is_supplier) + { + } + + virtual bool execute (const char* command) + { + bool status = true; + + if (this->is_supplier_) + { + if (ACE_OS::strcmp (command, TAO_NS_CONTROL_REMOVE_SUPPLIER) == 0) + { + status = this->ec_->destroy_supplier (this->id_); + } + else + { + // Indicate that the command was unknown. + return false; + } + } + else + { + if (ACE_OS::strcmp (command, TAO_NS_CONTROL_REMOVE_CONSUMER) == 0) + { + status = this->ec_->destroy_consumer (this->id_); + } + else + { + // Indicate that the command was unknown. + return false; + } + } + + return status; + } + +private: + TAO_MonitorEventChannel* ec_; + CosNotifyChannelAdmin::ProxyID id_; + bool is_supplier_; +}; + +// ****************************************************************** +// TAO_MonitorEventChannel Methods +// ****************************************************************** + +TAO_MonitorEventChannel::TAO_MonitorEventChannel (const char* name) + : name_ (name) +{ + this->add_stats (); +} + +TAO_MonitorEventChannel::~TAO_MonitorEventChannel (void) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_); + + Monitor_Point_Registry* instance = Monitor_Point_Registry::instance (); + size_t size = this->stat_names_.size (); + + for (size_t i = 0; i < size; ++i) + { + instance->remove (this->stat_names_[i].c_str ()); + } + + TAO_Control_Registry* cinstance = TAO_Control_Registry::instance (); + size = this->control_names_.size (); + + for (size_t i = 0; i < size; ++i) + { + cinstance->remove (this->control_names_[i]); + } +} + +const ACE_CString& +TAO_MonitorEventChannel::name (void) const +{ + return this->name_; +} + +bool +TAO_MonitorEventChannel::register_statistic (const ACE_CString& name, + Monitor_Base* stat) +{ + // Add the statistic to the registry and + // add the name to the statistic names list. + bool added = Monitor_Point_Registry::instance ()->add (stat); + + if (added) + { + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->names_mutex_, added); + + this->stat_names_.push_back (name); + } + + return added; +} + +bool +TAO_MonitorEventChannel::unregister_statistic (const ACE_CString& name) +{ + // Remove the statistic from the registry and if it's successful + // remove the name from the statistic names list. + bool removed = Monitor_Point_Registry::instance ()->remove (name.c_str ()); + + if (removed) + { + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->names_mutex_, removed); + + this->remove_list_name (this->stat_names_, name); + } + + return removed; +} + +void +TAO_MonitorEventChannel::map_supplier_proxy ( + CosNotifyChannelAdmin::ProxyID id, + const ACE_CString& name) +{ + if (name.length () == 0) + { + throw NotifyMonitoringExt::NameMapError (); + } + + ACE_CString full = this->name_ + "/" + name; + + ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->supplier_mutex_); + + if (this->is_duplicate_name (this->supplier_map_, full)) + { + throw NotifyMonitoringExt::NameAlreadyUsed (); + } + + if (this->supplier_map_.bind (id, full) != 0) + { + throw NotifyMonitoringExt::NameMapError (); + } + + // The view from the outside sees a supplier proxy as a consumer. + // So, we'll register a RemoveConsumerSupplierControl for each supplier + // proxy indicating a consumer (false) + RemoveConsumerSupplierControl* rcsc = 0; + ACE_NEW_THROW_EX (rcsc, + RemoveConsumerSupplierControl (this, full, id, false), + CORBA::NO_MEMORY ()); + + TAO_Control_Registry* cinstance = TAO_Control_Registry::instance (); + + if (cinstance->add (rcsc)) + { + ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_); + + this->control_names_.push_back (full); + } + else + { + delete rcsc; + ACE_ERROR ((LM_ERROR, + "Unable to add control: %s\n", + full.c_str ())); + } +} + +void +TAO_MonitorEventChannel::map_consumer_proxy ( + CosNotifyChannelAdmin::ProxyID id, + const ACE_CString& name) +{ + if (name.length () == 0) + { + throw NotifyMonitoringExt::NameMapError (); + } + + ACE_CString full = this->name_ + "/" + name; + + ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->consumer_mutex_); + + if (this->is_duplicate_name (this->consumer_map_, full)) + { + throw NotifyMonitoringExt::NameAlreadyUsed (); + } + + if (this->consumer_map_.bind (id, full) != 0) + { + throw NotifyMonitoringExt::NameMapError (); + } + + // The view from the outside sees a consumer proxy as a supplier. + // So, we'll register a RemoveConsumerSupplierControl for each consumer + // proxy indicating a supplier (true) + RemoveConsumerSupplierControl* rcsc = 0; + ACE_NEW_THROW_EX (rcsc, + RemoveConsumerSupplierControl (this, full, id, true), + CORBA::NO_MEMORY ()); + + TAO_Control_Registry* cinstance = TAO_Control_Registry::instance (); + + if (cinstance->add (rcsc)) + { + ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_); + + this->control_names_.push_back (full); + } + else + { + delete rcsc; + ACE_ERROR ((LM_ERROR, + "Unable to add control: %s\n", + full.c_str ())); + } +} + +void +TAO_MonitorEventChannel::cleanup_proxy (CosNotifyChannelAdmin::ProxyID id, + bool is_supplier, + bool experienced_timeout) +{ + ACE_CString name; + + if (is_supplier) + { + ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->supplier_mutex_); + + // It may seem like a good idea to throw a + // NotifyMonitoringExt::NameMapError() exception if we can't unbind the + // id from the map, but we can't. This method is called indirectly + // from CORBA calls (e.g., disconnect_structured_push_supplier) and at + // best it will cause a CORBA::UNKNOWN exception and at worst will + // cause an unexpected exception call which will cause the program to + // abort. + this->supplier_map_.unbind (id, name); + + // If this supplier proxy is being cleaned up because it + // experienced a timeout, we need to add it to the timeout supplier + // map. + if (experienced_timeout && name.length () != 0) + { + ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, tguard, + this->timedout_supplier_mutex_); + this->timedout_supplier_map_.bind (id, name); + } + } + else + { + ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->consumer_mutex_); + + // The same goes for this one too. + this->consumer_map_.unbind (id, name); + } + + if (name.length () != 0) + { + TAO_Control_Registry* cinstance = TAO_Control_Registry::instance (); + cinstance->remove (name); + + ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_); + + this->remove_list_name (this->control_names_, name); + } +} + +void +TAO_MonitorEventChannel::remove_consumeradmin ( + CosNotifyChannelAdmin::ProxyID id) +{ + ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->consumeradmin_mutex_); + + this->consumeradmin_map_.unbind (id); +} + +void +TAO_MonitorEventChannel::remove_supplieradmin ( + CosNotifyChannelAdmin::ProxyID id) +{ + ACE_WRITE_GUARD (TAO_SYNCH_RW_MUTEX, guard, this->supplieradmin_mutex_); + + this->supplieradmin_map_.unbind (id); +} + +void +TAO_MonitorEventChannel::add_stats (const char* name) +{ + if (name != 0 && this->name_.length () == 0) + { + this->name_ = name; + } + + if (this->name_.length () != 0) + { + ACE_CString dir_name (this->name_ + "/"); + ACE_CString stat_name = dir_name + + NotifyMonitoringExt::EventChannelCreationTime; + Monitor_Base* timestamp = 0; + ACE_NEW_THROW_EX (timestamp, + Timestamp_Monitor (stat_name.c_str ()), + CORBA::NO_MEMORY ()); + ACE_Time_Value tv (ACE_OS::gettimeofday ()); + timestamp->receive (tv.sec () + (tv.usec () / 1000000.0)); + + if (!this->register_statistic (stat_name, timestamp)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + timestamp->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelConsumerCount; + EventChannelConsumersSuppliers* consumers = 0; + ACE_NEW_THROW_EX (consumers, + EventChannelConsumersSuppliers ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_NUMBER), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, consumers)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + consumers->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelConsumerNames; + consumers = 0; + ACE_NEW_THROW_EX (consumers, + EventChannelConsumersSuppliers ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_LIST), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, consumers)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + consumers->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelTimedoutConsumerNames; + EventChannelTimedoutConsumers* tconsumers = 0; + ACE_NEW_THROW_EX (tconsumers, + EventChannelTimedoutConsumers ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_LIST), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, tconsumers)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + tconsumers->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelSupplierCount; + EventChannelConsumersSuppliers* suppliers = 0; + ACE_NEW_THROW_EX (suppliers, + EventChannelConsumersSuppliers ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_NUMBER, + true), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, suppliers)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + suppliers->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelSupplierNames; + suppliers = 0; + ACE_NEW_THROW_EX (suppliers, + EventChannelConsumersSuppliers ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_LIST, + true), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, suppliers)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + suppliers->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelConsumerAdminCount; + EventChannelConsumerSupplierAdmins* conadmins = 0; + ACE_NEW_THROW_EX (conadmins, + EventChannelConsumerSupplierAdmins ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_NUMBER), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, conadmins)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + conadmins->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelConsumerAdminNames; + conadmins = 0; + ACE_NEW_THROW_EX (conadmins, + EventChannelConsumerSupplierAdmins ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_LIST), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, conadmins)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + conadmins->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelSupplierAdminCount; + EventChannelConsumerSupplierAdmins* supadmins = 0; + ACE_NEW_THROW_EX (supadmins, + EventChannelConsumerSupplierAdmins ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_NUMBER, + true), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, supadmins)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + supadmins->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelSupplierAdminNames; + supadmins = 0; + ACE_NEW_THROW_EX (supadmins, + EventChannelConsumerSupplierAdmins ( + this, + stat_name.c_str (), + Monitor_Control_Types::MC_LIST, + true), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, supadmins)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + supadmins->remove_ref (); + + stat_name = dir_name + NotifyMonitoringExt::EventChannelQueueSize; + QueuedEvents* events = 0; + ACE_NEW_THROW_EX (events, + QueuedEvents (this, + stat_name.c_str (), + Monitor_Control_Types::MC_NUMBER, + false), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, events)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + events->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelQueueElementCount; + events = 0; + ACE_NEW_THROW_EX (events, + QueuedEvents (this, + stat_name.c_str (), + Monitor_Control_Types::MC_NUMBER, + true), + CORBA::NO_MEMORY ()); + + events->add_to_registry (); + events->remove_ref (); + + stat_name = dir_name + NotifyMonitoringExt::EventChannelOldestEvent; + OldestEvent* oldest = 0; + ACE_NEW_THROW_EX (oldest, + OldestEvent (this, stat_name.c_str (), + Monitor_Control_Types::MC_TIME), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, oldest)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + oldest->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelSlowestConsumers; + SlowestConsumers* slowest = 0; + ACE_NEW_THROW_EX (slowest, + SlowestConsumers (this, + stat_name.c_str (), + Monitor_Control_Types::MC_LIST), + CORBA::NO_MEMORY ()); + + if (!this->register_statistic (stat_name, slowest)) + { + ACE_ERROR ((LM_ERROR, + "Unable to add statistic %s\n", + stat_name.c_str ())); + } + + // Registry manages refcount, so we do this regardless. + slowest->remove_ref (); + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelQueueOverflows; + QueueOverflows* overflows = 0; + ACE_NEW_THROW_EX (overflows, + QueueOverflows (stat_name.c_str ()), + CORBA::NO_MEMORY ()); + if (!this->register_statistic (stat_name, overflows)) + { + delete overflows; + ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", + stat_name.c_str ())); + } + + TAO_Control_Registry* cinstance = + TAO_Control_Registry::instance (); + + ShutdownControl* sd = 0; + ACE_NEW_THROW_EX (sd, + ShutdownControl (this, + this->name_), + CORBA::NO_MEMORY ()); + + if (cinstance->add (sd)) + { + ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->names_mutex_); + this->control_names_.push_back (this->name_); + } + else + { + delete sd; + ACE_ERROR ((LM_ERROR, + "Unable to add control: %s\n", + this->name_.c_str ())); + } + } +} + +CosNotifyChannelAdmin::ConsumerAdmin_ptr +TAO_MonitorEventChannel::new_for_consumers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id) +{ + return this->named_new_for_consumers (op, id, 0); +} + +CosNotifyChannelAdmin::ConsumerAdmin_ptr +TAO_MonitorEventChannel::named_new_for_consumers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id, + const char* name) +{ + if (name != 0 && name[0] == 0) + { + throw NotifyMonitoringExt::NameMapError (); + } + + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + this->TAO_Notify_EventChannel::new_for_consumers (op, id); + + // WARNING: Internal knowledge of TAO. + TAO_MonitorConsumerAdmin* low_admin = + dynamic_cast<TAO_MonitorConsumerAdmin*> (admin->_servant ()); + + if (low_admin == 0) + { + // This shouldn't happen. + throw CORBA::INTERNAL (); + } + else + { + // Build up the full name. + ACE_CString full = this->name_ + "/"; + + if (name == 0) + { + char idname[64]; + ACE_OS::sprintf (idname, "%d", id); + full += idname; + } + else + { + full += name; + } + + ACE_WRITE_GUARD_RETURN (TAO_SYNCH_RW_MUTEX, guard, + this->consumeradmin_mutex_, + CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); + + if (this->is_duplicate_name (this->consumeradmin_map_, full)) + { + throw NotifyMonitoringExt::NameAlreadyUsed (); + } + + if (this->consumeradmin_map_.bind (id, full) != 0) + { + throw NotifyMonitoringExt::NameMapError (); + } + + low_admin->register_stats_controls (this, full); + } + + return admin._retn (); +} + +CosNotifyChannelAdmin::SupplierAdmin_ptr +TAO_MonitorEventChannel::new_for_suppliers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id) +{ + return this->named_new_for_suppliers (op, id, 0); +} + +CosNotifyChannelAdmin::SupplierAdmin_ptr +TAO_MonitorEventChannel::named_new_for_suppliers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id, + const char* name) +{ + if (name != 0 && name[0] == 0) + { + throw NotifyMonitoringExt::NameMapError (); + } + + CosNotifyChannelAdmin::SupplierAdmin_var admin = + this->TAO_Notify_EventChannel::new_for_suppliers (op, id); + + // WARNING: Internal knowledge of TAO. + TAO_MonitorSupplierAdmin* low_admin = + dynamic_cast<TAO_MonitorSupplierAdmin*> (admin->_servant ()); + + if (low_admin == 0) + { + // This shouldn't happen + throw CORBA::INTERNAL (); + } + else + { + // Build up the full name + ACE_CString full = this->name_ + "/"; + + if (name == 0) + { + char idname[64]; + ACE_OS::sprintf (idname, "%d", id); + full += idname; + } + else + { + full += name; + } + + ACE_WRITE_GUARD_RETURN (TAO_SYNCH_RW_MUTEX, guard, + this->supplieradmin_mutex_, + CosNotifyChannelAdmin::SupplierAdmin::_nil ()); + + if (this->is_duplicate_name (this->supplieradmin_map_, full)) + { + throw NotifyMonitoringExt::NameAlreadyUsed (); + } + + if (this->supplieradmin_map_.bind (id, full) != 0) + { + throw NotifyMonitoringExt::NameMapError (); + } + + low_admin->register_stats_controls (this, full); + } + + return admin._retn (); +} + +size_t +TAO_MonitorEventChannel::get_consumers ( + Monitor_Control_Types::NameList* names) +{ + size_t count = 0; + CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids = + this->get_all_consumeradmins (); + CORBA::ULong length = conadmin_ids->length (); + + for (CORBA::ULong j = 0; j < length; ++j) + { + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + this->get_consumeradmin (conadmin_ids[j]); + + if (!CORBA::is_nil (admin.in ())) + { + CosNotifyChannelAdmin::ProxyIDSeq_var proxys = + admin->push_suppliers (); + CORBA::ULong plen = proxys->length (); + + if (plen > 0) + { + ACE_READ_GUARD_RETURN (TAO_SYNCH_RW_MUTEX, + guard, + this->supplier_mutex_, + 0); + + for (CORBA::ULong i = 0; i < plen; ++i) + { + if (names == 0) + { + if (this->supplier_map_.find (proxys[i]) == 0) + { + count++; + } + } + else + { + ACE_CString name; + + if (this->supplier_map_.find (proxys[i], name) == 0) + { + count++; + names->push_back (name); + } + } + } + } + } + } + + return count; +} + +void +TAO_MonitorEventChannel::get_timedout_consumers ( + Monitor_Control_Types::NameList* names) +{ + ACE_READ_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->timedout_supplier_mutex_); + + Map::const_iterator itr (this->timedout_supplier_map_); + Map::value_type* entry = 0; + while (itr.next (entry)) + { + names->push_back (entry->item ()); + itr.advance (); + } +} + +size_t +TAO_MonitorEventChannel::get_suppliers ( + Monitor_Control_Types::NameList* names) +{ + size_t count = 0; + CosNotifyChannelAdmin::AdminIDSeq_var supadmin_ids = + this->get_all_supplieradmins (); + CORBA::ULong length = supadmin_ids->length (); + + for (CORBA::ULong j = 0; j < length; ++j) + { + CosNotifyChannelAdmin::SupplierAdmin_var admin = + this->get_supplieradmin (supadmin_ids[j]); + + if (!CORBA::is_nil (admin.in ())) + { + CosNotifyChannelAdmin::ProxyIDSeq_var proxys = + admin->push_consumers (); + CORBA::ULong plen = proxys->length (); + + if (plen > 0) + { + ACE_READ_GUARD_RETURN (TAO_SYNCH_RW_MUTEX, + guard, + this->consumer_mutex_, + 0); + + for (CORBA::ULong i = 0; i < plen; ++i) + { + if (names == 0) + { + if (this->consumer_map_.find (proxys[i]) == 0) + { + count++; + } + } + else + { + ACE_CString name; + + if (this->consumer_map_.find (proxys[i], name) == 0) + { + count++; + names->push_back (name); + } + } + } + } + } + } + + return count; +} + +size_t +TAO_MonitorEventChannel::get_consumeradmins ( + Monitor_Control_Types::NameList* names) +{ + ACE_READ_GUARD_RETURN (TAO_SYNCH_RW_MUTEX, + guard, + this->consumeradmin_mutex_, + 0); + + CosNotifyChannelAdmin::AdminIDSeq_var admin_ids = + this->get_all_consumeradmins (); + + return this->get_admins (this->consumeradmin_map_, + admin_ids.in (), + names); +} + +size_t +TAO_MonitorEventChannel::get_supplieradmins ( + Monitor_Control_Types::NameList* names) +{ + ACE_READ_GUARD_RETURN (TAO_SYNCH_RW_MUTEX, + guard, + this->supplieradmin_mutex_, + 0); + + CosNotifyChannelAdmin::AdminIDSeq_var admin_ids = + this->get_all_supplieradmins (); + + return this->get_admins (this->supplieradmin_map_, + admin_ids.in (), + names); +} + +size_t +TAO_MonitorEventChannel::get_admins ( + TAO_MonitorEventChannel::Map& map, + const CosNotifyChannelAdmin::AdminIDSeq& admin_ids, + Monitor_Control_Types::NameList* names) +{ + size_t count = 0; + CORBA::ULong length = admin_ids.length (); + + for (CORBA::ULong j = 0; j < length; ++j) + { + if (names == 0) + { + if (map.find (admin_ids[j]) == 0) + { + count++; + } + } + else + { + ACE_CString name; + + if (map.find (admin_ids[j], name) == 0) + { + count++; + names->push_back (name); + } + } + } + + return count; +} + +TAO_Notify_ThreadPool_Task* +TAO_MonitorEventChannel::get_threadpool_task ( + CosNotifyChannelAdmin::AdminID id) +{ + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + this->get_consumeradmin (id); + + if (!CORBA::is_nil (admin.in ())) + { + // WARNING: Internal knowledge of TAO and the Notification + // Service implementation. + TAO_Notify_ConsumerAdmin* low_admin = + dynamic_cast<TAO_Notify_ConsumerAdmin*> (admin->_servant ()); + + if (low_admin != 0) + { + return + dynamic_cast<TAO_Notify_ThreadPool_Task*> ( + low_admin->get_worker_task ()); + } + } + + return 0; +} + +size_t +TAO_MonitorEventChannel::calculate_queue_size (bool count) +{ + size_t size = 0; + CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids = + this->get_all_consumeradmins (); + CORBA::ULong length = conadmin_ids->length (); + + for (CORBA::ULong j = 0; j < length; j++) + { + TAO_Notify_ThreadPool_Task* task = + this->get_threadpool_task (conadmin_ids[j]); + + if (task != 0) + { + TAO_Notify_Message_Queue* queue = task->msg_queue (); + + if (count) + { + size += queue->message_count (); + } + else + { + // The message blocks stored in this queue are of size + // zero. However, each message block is a + // TAO_Notify_Event which has an associated set of data + // which can be used to estimate the amount of memory + // allocated to the message queue + size += (queue->message_count () /* * sizeof (TAO_Notify_Event) */); + } + } + } + + return size; +} + +void +TAO_MonitorEventChannel::determine_slowest_consumer ( + Monitor_Control_Types::NameList* names) +{ + size_t largest = 0; + CosNotifyChannelAdmin::AdminID id = 0; + + CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids = + this->get_all_consumeradmins (); + CORBA::ULong length = conadmin_ids->length (); + + for (CORBA::ULong j = 0; j < length; ++j) + { + TAO_Notify_ThreadPool_Task* task = + this->get_threadpool_task (conadmin_ids[j]); + + if (task != 0) + { + TAO_Notify_Message_Queue* queue = task->msg_queue (); + size_t count = queue->message_count (); + + if (count > largest) + { + largest = count; + id = conadmin_ids[j]; + } + } + } + + if (largest > 0) + { + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + this->get_consumeradmin (id); + + if (!CORBA::is_nil (admin.in ())) + { + CosNotifyChannelAdmin::ProxyIDSeq_var proxys = + admin->push_suppliers (); + CORBA::ULong plen = proxys->length (); + + if (plen > 0) + { + ACE_READ_GUARD (TAO_SYNCH_RW_MUTEX, + guard, + this->supplier_mutex_); + + for (CORBA::ULong i = 0; i < plen; ++i) + { + ACE_CString name; + + if (this->supplier_map_.find (proxys[i], name) == 0) + { + names->push_back (name); + } + } + } + } + } +} + +double +TAO_MonitorEventChannel::get_oldest_event (void) +{ + CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids = + this->get_all_consumeradmins (); + CORBA::ULong length = conadmin_ids->length (); + + // If there are no consumers, we will return the special case indicator. + if (length == 0) + { + return 0.0; + } + + // Loop through each consumer admins thread pool task and have the + // buffering strategy give us the time of the oldest event. + ACE_Time_Value tv (ACE_Time_Value::max_time); + + for (CORBA::ULong j = 0; j < length; ++j) + { + TAO_Notify_ThreadPool_Task* task = + this->get_threadpool_task (conadmin_ids[j]); + + if (task != 0) + { + ACE_Time_Value old = + task->buffering_strategy ()->oldest_event (); + + if (old < tv) + { + tv = old; + } + } + } + + // If there was no message in any of the queues, our time value will be + // ACE_Time_Value::max_time. For this situation, we must return the + // special case indicator. + if (tv == ACE_Time_Value::max_time) + { + return 0.0; + } + + return tv.sec () + (tv.usec () / 1000000.0); +} + +bool +TAO_MonitorEventChannel::is_duplicate_name ( + const TAO_MonitorEventChannel::Map& map, + const ACE_CString& name) const +{ + Map::const_iterator itr (map); + Map::value_type* entry = 0; + + while (itr.next (entry)) + { + if (name == entry->item ()) + { + return true; + } + + itr.advance (); + } + + return false; +} + +void +TAO_MonitorEventChannel::remove_list_name ( + Monitor_Control_Types::NameList& list, + const ACE_CString& name) +{ + size_t size = list.size (); + + for (size_t i = 0; i < size; ++i) + { + if (list[i] == name) + { + if (size == 1) + { + list.clear (); + } + else + { + if (i != size - 1) + { + list[i] = list[size - 1]; + } + + list.resize (size - 1, ""); + } + + break; + } + } +} + +bool +TAO_MonitorEventChannel::destroy_consumer ( + CosNotifyChannelAdmin::ProxyID id) +{ + // Since we just have a proxy id, we have to search through all of the + // consumer admins. We have no idea to which the proxy will belong. + CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids = + this->get_all_consumeradmins (); + CORBA::ULong length = conadmin_ids->length (); + + for (CORBA::ULong j = 0; j < length; ++j) + { + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + this->get_consumeradmin (conadmin_ids[j]); + + if (!CORBA::is_nil (admin.in ())) + { + try + { + CosNotifyChannelAdmin::ProxySupplier_var supplier = + admin->get_proxy_supplier(id); + + if (!CORBA::is_nil(supplier.in())) + { + // WARNING: Internal knowledge of TAO and the + // Notification Service implementation. + TAO_Notify_ProxySupplier* low_proxy = + dynamic_cast<TAO_Notify_ProxySupplier*> ( + supplier->_servant ()); + + low_proxy->destroy (); + return true; + } + } + catch(const CosNotifyChannelAdmin::ProxyNotFound&) + { + } + } + } + + return false; +} + +bool +TAO_MonitorEventChannel::destroy_supplier ( + CosNotifyChannelAdmin::ProxyID id) +{ + // Since we just have a proxy id, we have to search through all of the + // supplier admins. We have no idea to which the proxy will belong. + CosNotifyChannelAdmin::AdminIDSeq_var supadmin_ids = + this->get_all_supplieradmins (); + CORBA::ULong length = supadmin_ids->length (); + + for (CORBA::ULong j = 0; j < length; ++j) + { + CosNotifyChannelAdmin::SupplierAdmin_var admin = + this->get_supplieradmin (supadmin_ids[j]); + + if (!CORBA::is_nil (admin.in ())) + { + try + { + CosNotifyChannelAdmin::ProxyConsumer_var consumer = + admin->get_proxy_consumer(id); + + if (!CORBA::is_nil(consumer.in())) + { + // WARNING: Internal knowledge of TAO and the + // Notification Service implementation. + TAO_Notify_ProxyConsumer* low_proxy = + dynamic_cast<TAO_Notify_ProxyConsumer*> ( + consumer->_servant ()); + + low_proxy->destroy (); + return true; + } + } + catch(const CosNotifyChannelAdmin::ProxyNotFound&) + { + } + } + } + + return false; +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#endif /* TAO_HAS_MONITOR_FRAMEWORK==1 */ + |