diff options
author | elliott_c <elliott_c@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-03-24 12:30:52 +0000 |
---|---|---|
committer | elliott_c <elliott_c@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-03-24 12:30:52 +0000 |
commit | 12490eebaf6091d95ae78397a9f66aff2e9cf452 (patch) | |
tree | fb6a7bcd98d07ea36b8071e0622423d06f6bef44 /TAO | |
parent | de5dd24900c3eae754faaf77bbda4c687fdf033d (diff) | |
download | ATCD-12490eebaf6091d95ae78397a9f66aff2e9cf452.tar.gz |
ChangeLogTag: Mon Mar 24 12:33:47 UTC 2008 Chad Elliott <elliott_c@ociweb.com>
Diffstat (limited to 'TAO')
26 files changed, 1390 insertions, 201 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index f0ca92c7a5f..75a275c5d14 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,72 @@ +Mon Mar 24 12:33:47 UTC 2008 Chad Elliott <elliott_c@ociweb.com> + + * NEWS: + + Added an entry for this feature addition. + + * docs/notification/using_monitor_extensions.html: + + Documented the ability to create named consumer and supplier + admins. + + * orbsvcs/examples/Notify/MC/TkMonitor/modules/MonitorControl.pm: + + Changed to avoid displaying queue size if the queue size is zero. + + * orbsvcs/examples/Notify/MC/monitor/monitor.cpp: + + Added commands to remove consumer and supplier admins. + + * orbsvcs/orbsvcs/Notify/Admin.cpp: + + Fixed a bug where memory was accessed after it was deleted. + + * orbsvcs/orbsvcs/Notify/Buffering_Strategy.h: + * orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp: + * orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h: + * orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.cpp: + * orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannelFactory.h: + * orbsvcs/orbsvcs/Notify/MonitorControlExt/NotifyMonitoringExt.idl: + * orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.h: + * orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.cpp: + + Added a statistic to track the queue size for each ConsumerAdmin. + Each time a message is added to or removed from the queue, a queue + measurement is taken. This allows the statistic observer to see + the average queue size for each consumer admin. + + Conversly, the event channel queue size is calculated by adding up + all the consumer admin queues and is only accessed when the + statistic observer requests it. + + * orbsvcs/orbsvcs/Notify/MonitorControl/Control.h: + * orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMC.idl: + * orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.h: + * orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.cpp: + * orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h: + * orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp: + + Added the ability to remove individual consumers and suppliers + from the event channel. Technically, the supplier proxy and + consumer proxy are the objects that are removed. + + Added operations to remove consumer and supplier admins through + the monitor interface. + + * orbsvcs/orbsvcs/Notify/EventChannel.h: + * orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h: + * orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.cpp: + + Added statistics to track the names and counts of consumer and + supplier admins. Also added code to remove consumer and supplier + admins by name. + + * orbsvcs/tests/Notify/MC/test_monitor.cpp: + * orbsvcs/tests/unit/Notify/MC/Control/Control.cpp: + * orbsvcs/tests/unit/Notify/MC/MonitorControlExt/MonitorControlExt.cpp: + + Updated tests for the changes above. + Mon Mar 24 02:25:48 UTC 2008 Douglas C. Schmidt <schmidt@dre.vanderbilt.edu> * COPYING: Updated the license a bit based on feedback from Tom @@ -11,6 +11,9 @@ USER VISIBLE CHANGES BETWEEN TAO-1.6.3 and TAO-1.6.4 . Improved CORBA/e support and as result the footprint decreased when CORBA/e has been enabled +. Added the ability to remove consumer and supplier proxies and consumer and + supplier admins through the Notify Monitor and Control interface. + USER VISIBLE CHANGES BETWEEN TAO-1.6.2 and TAO-1.6.3 ==================================================== diff --git a/TAO/docs/notification/using_monitor_extensions.html b/TAO/docs/notification/using_monitor_extensions.html index 111fd811146..324fb63cc08 100644 --- a/TAO/docs/notification/using_monitor_extensions.html +++ b/TAO/docs/notification/using_monitor_extensions.html @@ -53,6 +53,20 @@ narrowed to a <b>NotifyMonitoringExt::ConsumerAdmin</b>. NotifyMonitoringExt::ConsumerAdmin::_narrow(admin.in()); </pre> <p> +It is also possible to create named ConsumerAdmin objects using the +following. +</p> +<pre> + CosNotifyChannelAdmin::AdminID admin_id; + + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + ec->named_new_for_consumers(CosNotifyChannelAdmin::OR_OP, admin_id, + "ConsumerAdmin Name"); + + NotifyMonitoringExt::ConsumerAdmin_var madmin = + NotifyMonitoringExt::ConsumerAdmin::_narrow(admin.in()); +</pre> +<p> Once you have a <b>NotifyMonitoringExt::ConsumerAdmin</b> you can then create a named proxy push supplier: </p> @@ -81,6 +95,20 @@ narrowed to a <b>NotifyMonitoringExt::SupplierAdmin</b>. NotifyMonitoringExt::SupplierAdmin::_narrow(admin.in()); </pre> <p> +It is also possible to create named ConsumerAdmin objects using the +following. +</p> +<pre> + CosNotifyChannelAdmin::AdminID admin_id; + + CosNotifyChannelAdmin::SupplierAdmin_var admin = + ec->named_new_for_suppliers(CosNotifyChannelAdmin::AND_OP, admin_id, + "SupplierAdmin Name"); + + NotifyMonitoringExt::SupplierAdmin_var madmin = + NotifyMonitoringExt::SupplierAdmin::_narrow(admin.in()); +</pre> +<p> Once you have a <b>NotifyMonitoringExt::SupplierAdmin</b> you can then create a named proxy push consumer: </p> diff --git a/TAO/orbsvcs/examples/Notify/MC/TkMonitor/modules/MonitorControl.pm b/TAO/orbsvcs/examples/Notify/MC/TkMonitor/modules/MonitorControl.pm index 479a4da9a0e..ecc31284f6a 100644 --- a/TAO/orbsvcs/examples/Notify/MC/TkMonitor/modules/MonitorControl.pm +++ b/TAO/orbsvcs/examples/Notify/MC/TkMonitor/modules/MonitorControl.pm @@ -338,21 +338,26 @@ sub more { NotifyMonitoringExt::EventChannelQueueElementCount); return if (!defined $queuecount); $queuecount = $queuecount->last(); + if ($concount > 0) { $self->{'text'}->insert('end', "Queue: $queuecount messages"); } - if ($queuecount == 0) { + + my($queuesize) = $self->getStatisticData( + $name . '/' . + NotifyMonitoringExt::EventChannelQueueSize); + return if (!defined $queuesize); + $queuesize = $queuesize->last(); + + ## Since we're not obtaining queuecount and queuesize atomically, + ## we have to make sure that there's still something in the queue. + if ($queuecount == 0 && $queuesize > 0) { if ($concount > 0) { $self->{'text'}->insert('end', "\n"); } } else { - my($queuesize) = $self->getStatisticData( - $name . '/' . - NotifyMonitoringExt::EventChannelQueueSize); - return if (!defined $queuesize); - $queuesize = $queuesize->last(); $self->{'text'}->insert('end', ' approximately totaling ' . "$queuesize bytes\n\n"); diff --git a/TAO/orbsvcs/examples/Notify/MC/monitor/monitor.cpp b/TAO/orbsvcs/examples/Notify/MC/monitor/monitor.cpp index 4ddc1104d25..08328c53f7c 100644 --- a/TAO/orbsvcs/examples/Notify/MC/monitor/monitor.cpp +++ b/TAO/orbsvcs/examples/Notify/MC/monitor/monitor.cpp @@ -4,7 +4,12 @@ #include "ace/Get_Opt.h" #include "ace/OS_NS_ctype.h" -const ACE_TCHAR* monitor_ior = 0; +static const ACE_TCHAR* monitor_ior = 0; +static const char* shutdown_cmd = "shutdown"; +static const char* rm_consumer = "remove_consumer"; +static const char* rm_supplier = "remove_supplier"; +static const char* rm_consumeradmin = "remove_consumeradmin"; +static const char* rm_supplieradmin = "remove_supplieradmin"; static int parse_args (int argc, ACE_TCHAR *argv[]) @@ -38,10 +43,51 @@ sorter (const void* a, const void* b) return ACE_OS::strcmp (left, right); } +bool +process_command(CosNotification::NotificationServiceMonitorControl_ptr nsm, + char* buf, const char* command) +{ + if (ACE_OS::strstr(buf, command) == buf) + { + const char* name = buf + ACE_OS::strlen (command); + bool space = false; + size_t i = 0; + while (ACE_OS::ace_isspace (name[i])) + { + space = true; + i++; + } + if (space) + { + try + { + const char* start = name + i; + if (ACE_OS::strcmp (command, shutdown_cmd) == 0) + nsm->shutdown_event_channel (start); + else if (ACE_OS::strcmp (command, rm_consumer) == 0) + nsm->remove_consumer (start); + else if (ACE_OS::strcmp (command, rm_supplier) == 0) + nsm->remove_supplier (start); + else if (ACE_OS::strcmp (command, rm_consumeradmin) == 0) + nsm->remove_consumeradmin (start); + else if (ACE_OS::strcmp (command, rm_supplieradmin) == 0) + nsm->remove_supplieradmin (start); + } + catch (const CORBA::Exception& ex) + { + ACE_OS::strcat (buf, ": "); + ex._tao_print_exception (buf); + } + return true; + } + } + + return false; +} + int ACE_TMAIN (int argc, ACE_TCHAR* argv[]) { - static const char* shutdown = "shutdown"; int status = 0; try { @@ -109,14 +155,22 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) } else if (ACE_OS::strcmp (rl, "help") == 0) { - ACE_DEBUG ((LM_DEBUG, "names - Get a list of currently " - "available statistic names.\n" - "quit - Exit the monitor.\n" - "shutdown - Shut down an event " - "channel with the given name.\n" - "<statistic name> - Return the " - "information for the specified " - "statistic.\n")); + ACE_DEBUG ((LM_DEBUG, + "names - Get a list of " + "currently available statistic names.\n" + "quit - Exit the monitor.\n" + "remove_consumer - Remove a consumer " + "with the given name.\n" + "remove_supplier - Remove a supplier " + "with the given name.\n" + "remove_consumeradmin - Remove a consumer " + "admin with the given name.\n" + "remove_supplieradmin - Remove a supplier " + "admin with the given name.\n" + "shutdown - Shut down an " + "event channel with the given name.\n" + "<statistic name> - Return the " + "information for the specified statistic.\n")); } else if (ACE_OS::strcmp (rl, "names") == 0) { @@ -150,30 +204,17 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) } else { - if (ACE_OS::strstr(rl, shutdown) == rl) - { - const char* name = rl + ACE_OS::strlen (shutdown); - bool space = false; - size_t i = 0; - while (ACE_OS::ace_isspace (name[i])) - { - space = true; - i++; - } - if (space) - { - try - { - nsm->shutdown_event_channel (name + i); - } - catch (const CORBA::Exception& ex) - { - ACE_OS::strcat (rl, ": "); - ex._tao_print_exception (rl); - } - continue; - } - } + if (process_command (nsm, rl, shutdown_cmd)) + continue; + else if (process_command (nsm, rl, rm_consumer)) + continue; + else if (process_command (nsm, rl, rm_supplier)) + continue; + else if (process_command (nsm, rl, rm_consumeradmin)) + continue; + else if (process_command (nsm, rl, rm_supplieradmin)) + continue; + try { CosNotification::NotificationServiceMonitorControl::Data_var data = @@ -183,7 +224,8 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) { CosNotification::NotificationServiceMonitorControl::Numeric_var num = data->num (); - ACE_DEBUG ((LM_DEBUG, "%g\n", num->last)); + ACE_DEBUG ((LM_DEBUG, "Last: %g Average: %g\n", + num->last, num->average)); } else { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp index 657ea0a3d89..3beb7953c63 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp @@ -71,9 +71,12 @@ void TAO_Notify_Admin::cleanup_proxy (TAO_Notify_Proxy *proxy , bool is_supplier) { - this->remove (proxy); - + // We must clean up the proxy before calling remove. Doing it in the + // opposite order causes us to use invalid memory (through the call of + // id() into the proxy). ec_->cleanup_proxy (proxy->id(), is_supplier); + + this->remove (proxy); } void diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp index 90c4b945a24..e222b02d216 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.cpp @@ -35,6 +35,7 @@ TAO_Notify_Buffering_Strategy::TAO_Notify_Buffering_Strategy ( , local_not_full_ (global_queue_lock_) , local_not_empty_ (global_queue_lock_) , shutdown_ (false) +, tracker_ (0) { } @@ -93,6 +94,10 @@ TAO_Notify_Buffering_Strategy::oldest_event (void) return tv; } +TAO_Notify_Buffering_Strategy::Tracker::~Tracker (void) +{ +} + int TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* method_request) { @@ -160,7 +165,12 @@ TAO_Notify_Buffering_Strategy::enqueue (TAO_Notify_Method_Request_Queueable* met local_not_empty_.signal (); } - return ACE_Utils::truncate_cast<int> (this->msg_queue_.message_count ()); + + size_t count = this->msg_queue_.message_count (); + if (this->tracker_ != 0) + this->tracker_->update_queue_count (count); + + return ACE_Utils::truncate_cast<int> (count); } int @@ -187,6 +197,9 @@ TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &me if (this->msg_queue_.dequeue (mb) == -1) return -1; + if (this->tracker_ != 0) + this->tracker_->update_queue_count (this->msg_queue_.message_count ()); + method_request = dynamic_cast<TAO_Notify_Method_Request_Queueable*>(mb); if (method_request == 0) @@ -199,6 +212,13 @@ TAO_Notify_Buffering_Strategy::dequeue (TAO_Notify_Method_Request_Queueable* &me return 1; } +void +TAO_Notify_Buffering_Strategy::set_tracker ( + TAO_Notify_Buffering_Strategy::Tracker* tracker) +{ + this->tracker_ = tracker; +} + int TAO_Notify_Buffering_Strategy::queue (TAO_Notify_Method_Request_Queueable* method_request) { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h index 354832931d2..df5254fea9e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Buffering_Strategy.h @@ -72,6 +72,17 @@ public: /// Provide the time value of the oldest event in the queue. ACE_Time_Value oldest_event (void); + /// This interface allows tracking of the queue size + class TAO_Notify_Serv_Export Tracker + { + public: + virtual ~Tracker (void); + virtual void update_queue_count (size_t count) = 0; + }; + + /// Set the tracker object. This strategy does not own the tracker. + void set_tracker (Tracker* tracker); + private: /// Apply the Order Policy and queue. return -1 on error. @@ -115,6 +126,9 @@ private: /// Flag to shutdown. bool shutdown_; + + /// Optional queue tracker + Tracker* tracker_; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannel.h b/TAO/orbsvcs/orbsvcs/Notify/EventChannel.h index 9adc95ec755..55f0936b14a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventChannel.h +++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannel.h @@ -94,8 +94,8 @@ public: TAO_Notify_ProxySupplier * find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position); // cleanup any proxy resources held by the EC - virtual void cleanup_proxy (CosNotifyChannelAdmin::ProxyID id - , bool is_supplier); + virtual void cleanup_proxy (CosNotifyChannelAdmin::ProxyID id, + bool is_supplier); /// Shutdown virtual int shutdown (void); @@ -104,6 +104,14 @@ public: /// TAO_Notify_Container_T requires a destroy method virtual void destroy (void); + /// This is public to allow TAO_MonitorConsumerAdmin access. + virtual CosNotifyChannelAdmin::ConsumerAdmin_ptr + get_consumeradmin (CosNotifyChannelAdmin::AdminID id); + + /// This is public to allow TAO_MonitorSupplierAdmin access. + virtual CosNotifyChannelAdmin::SupplierAdmin_ptr + get_supplieradmin (CosNotifyChannelAdmin::AdminID id); + private: typedef TAO_Notify_Container_T <TAO_Notify_ConsumerAdmin> TAO_Notify_ConsumerAdmin_Container; typedef TAO_Notify_Container_T <TAO_Notify_SupplierAdmin> TAO_Notify_SupplierAdmin_Container; @@ -144,10 +152,6 @@ protected: virtual ::CosNotifyChannelAdmin::SupplierAdmin_ptr new_for_suppliers (CosNotifyChannelAdmin::InterFilterGroupOperator op, CosNotifyChannelAdmin::AdminID_out id); - virtual ::CosNotifyChannelAdmin::ConsumerAdmin_ptr get_consumeradmin (CosNotifyChannelAdmin::AdminID id); - - virtual ::CosNotifyChannelAdmin::SupplierAdmin_ptr get_supplieradmin (CosNotifyChannelAdmin::AdminID id); - virtual ::CosNotifyChannelAdmin::AdminIDSeq * get_all_consumeradmins (void); virtual ::CosNotifyChannelAdmin::AdminIDSeq * get_all_supplieradmins (void); diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Control.h b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Control.h index 71235ca6d5d..d04c998a669 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Control.h +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Control.h @@ -9,6 +9,10 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL #define TAO_NS_CONTROL_SHUTDOWN "shutdown" +#define TAO_NS_CONTROL_REMOVE_CONSUMER "remove_consumer" +#define TAO_NS_CONTROL_REMOVE_SUPPLIER "remove_supplier" +#define TAO_NS_CONTROL_REMOVE_CONSUMERADMIN "remove_consumeradmin" +#define TAO_NS_CONTROL_REMOVE_SUPPLIERADMIN "remove_supplieradmin" class TAO_Notify_MC_Export TAO_NS_Control: public TAO_NS_Generic { @@ -16,8 +20,9 @@ public: /// This is here due to virtual functions. virtual ~TAO_NS_Control (void); - /// Execute the control object - virtual void execute (const char* command) = 0; + /// Execute the control object. Only return false if the command + /// specified is not supported. + virtual bool execute (const char* command) = 0; protected: /// Construct a control object. diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMC.idl b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMC.idl index a9b447353a6..e3b04e52062 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMC.idl +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMC.idl @@ -67,9 +67,22 @@ module CosNotification void clear_statistics (in NameList names) raises (InvalidName); + // Control commands begin here void shutdown_event_channel (in string name) raises (InvalidName); + void remove_consumer (in string name) + raises (InvalidName); + + void remove_supplier (in string name) + raises (InvalidName); + + void remove_consumeradmin (in string name) + raises (InvalidName); + + void remove_supplieradmin (in string name) + raises (InvalidName); + oneway void shutdown (); }; }; diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.cpp b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.cpp index 5ca7fdf69c5..b83f5dd815f 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.cpp @@ -115,19 +115,52 @@ NotificationServiceMonitor_i::clear_statistics (const CosNotification::Notificat void NotificationServiceMonitor_i::shutdown_event_channel (const char* name) { + this->send_control_command (name, TAO_NS_CONTROL_SHUTDOWN); +} + +void +NotificationServiceMonitor_i::remove_consumer (const char* name) +{ + this->send_control_command (name, TAO_NS_CONTROL_REMOVE_CONSUMER); +} + +void +NotificationServiceMonitor_i::remove_supplier (const char* name) +{ + this->send_control_command (name, TAO_NS_CONTROL_REMOVE_SUPPLIER); +} + +void +NotificationServiceMonitor_i::remove_consumeradmin (const char* name) +{ + this->send_control_command (name, TAO_NS_CONTROL_REMOVE_CONSUMERADMIN); +} + +void +NotificationServiceMonitor_i::remove_supplieradmin (const char* name) +{ + this->send_control_command (name, TAO_NS_CONTROL_REMOVE_SUPPLIERADMIN); +} + +void +NotificationServiceMonitor_i::send_control_command (const char* name, + const char* cmd) +{ TAO_Control_Registry* instance = TAO_Control_Registry::instance (); TAO_NS_Control* control = instance->get (name); - if (control == 0) + + // If we didn't find a control object with the given name, or the + // execution of the control object failed, we must throw an exception. + // The control object execution should only return false when the + // command given does not correspond to one that is supported by the + // control object. + if (control == 0 || !control->execute (cmd)) { CosNotification::NotificationServiceMonitorControl::NameList invalid (1); invalid.length (1); invalid[0] = name; throw CosNotification::NotificationServiceMonitorControl::InvalidName (invalid); } - else - { - control->execute (TAO_NS_CONTROL_SHUTDOWN); - } } void diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.h b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.h index 7d375c40e72..26c06711dbc 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.h +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/NotificationServiceMonitor_i.h @@ -43,10 +43,25 @@ public: /// Shut down an event channel by name virtual void shutdown_event_channel (const char* name); + /// Remove a consumer by name + virtual void remove_consumer (const char* name); + + /// Remove a supplier by name + virtual void remove_supplier (const char* name); + + /// Remove a consumer admin by name + virtual void remove_consumeradmin (const char* name); + + /// Remove a supplier admin by name + virtual void remove_supplieradmin (const char* name); + /// Call shutdown() on the ORB if one was provided during construction virtual void shutdown (void); private: + void send_control_command (const char* name, + const char* cmd); + void get_data (TAO_Statistic_Registry* registry, const char* name, CosNotification::NotificationServiceMonitorControl::Data& data); diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.cpp b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.cpp index 3f97026eb85..1dfcd8ec26e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.cpp @@ -6,8 +6,6 @@ # include "Statistic.inl" #endif /* ! __ACE_INLINE__ */ -static size_t Growth_Size = 32; - TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Statistic::TAO_Statistic (const char* name, @@ -18,6 +16,8 @@ TAO_Statistic::TAO_Statistic (const char* name, minimum_set_ (false), minimum_ (0), maximum_ (0), + sum_ (0), + sum_of_squares_ (0), last_ (0) { } @@ -28,7 +28,7 @@ TAO_Statistic::~TAO_Statistic (void) if (this->type_ == TS_LIST) for(size_t i = 0; i < this->index_; i++) { - delete [] this->data_[i].string_; + delete [] this->data_[i]; } } @@ -40,21 +40,15 @@ TAO_Statistic::calculate (void) void TAO_Statistic::receive (double data) { - ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->mutex_); if (this->type_ == TS_LIST) throw Invalid_Operation(); + ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->mutex_); if (this->type_ != TS_COUNTER) { - size_t max = this->data_.max_size(); - if (this->index_ >= max) - { - if (this->data_.max_size(max + Growth_Size) == -1) - { - throw CORBA::NO_MEMORY(); - } - } - this->data_[this->index_++].number_ = data; + this->sum_ += data; + this->sum_of_squares_ += (data * data); + ++this->index_; } if (this->type_ == TS_COUNTER) @@ -88,14 +82,14 @@ TAO_Statistic::receive (const TAO_Statistic::List& data) ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->mutex_); for(size_t i = 0; i < this->index_; i++) { - delete [] this->data_[i].string_; + delete [] this->data_[i]; } this->index_ = data.size (); this->data_.max_size (this->index_); for(size_t i = 0; i < this->index_; i++) { - this->data_[i].string_ = CORBA::string_dup (data[i].c_str ()); + this->data_[i] = CORBA::string_dup (data[i].c_str ()); } } @@ -108,13 +102,15 @@ TAO_Statistic::clear (void) if (this->type_ == TS_LIST) for(size_t i = 0; i < this->index_; i++) { - delete [] this->data_[i].string_; + delete [] this->data_[i]; } this->index_ = 0; this->minimum_set_ = false; this->minimum_ = 0; this->maximum_ = 0; + this->sum_ = 0; + this->sum_of_squares_ = 0; this->last_ = 0; } @@ -125,13 +121,7 @@ TAO_Statistic::average (void) const throw Invalid_Operation(); ACE_READ_GUARD_RETURN (ACE_SYNCH_RW_MUTEX, guard, this->mutex_, 0); - - double accum = 0; - for(size_t i = 0; i < this->index_; i++) - { - accum += this->data_[i].number_; - } - return (this->index_ == 0 ? 0.0 :accum / this->index_); + return (this->index_== 0 ? 0.0 : this->sum_ / this->index_); } double @@ -141,14 +131,7 @@ TAO_Statistic::sum_of_squares (void) const throw Invalid_Operation(); ACE_READ_GUARD_RETURN (ACE_SYNCH_RW_MUTEX, guard, this->mutex_, 0); - - double accum = 0; - for(size_t i = 0; i < this->index_; i++) - { - double data = this->data_[i].number_; - accum += (data * data); - } - return accum; + return this->sum_of_squares_; } TAO_Statistic::List @@ -161,9 +144,10 @@ TAO_Statistic::get_list (void) const ACE_READ_GUARD_RETURN (ACE_SYNCH_RW_MUTEX, guard, this->mutex_, list); for(size_t i = 0; i < this->index_; i++) { - list.push_back (this->data_[i].string_); + list.push_back (this->data_[i]); } return list; } + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.h b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.h index 5ac1a563579..477b628be3f 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.h +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControl/Statistic.h @@ -83,16 +83,13 @@ private: mutable ACE_SYNCH_RW_MUTEX mutex_; size_t index_; - union Data { - const char* string_; - double number_; - }; - ACE_Array_Base<Data> data_; - bool minimum_set_; double minimum_; double maximum_; + double sum_; + double sum_of_squares_; double last_; + ACE_Array_Base<const char*> data_; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.cpp b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.cpp index fa4b84b8b59..da4b7369b06 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.cpp @@ -1,15 +1,112 @@ // $Id$ #include "orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h" #include "orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h" +#include "orbsvcs/Notify/MonitorControl/Control_Registry.h" +#include "orbsvcs/Notify/MonitorControl/Statistic.h" +#include "orbsvcs/Notify/MonitorControl/Control.h" +#include "orbsvcs/Notify/Buffering_Strategy.h" +#include "orbsvcs/Notify/ThreadPool_Task.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL +class ConsumerAdminControl: public TAO_NS_Control +{ +public: + ConsumerAdminControl (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + CosNotifyChannelAdmin::AdminID id) + : TAO_NS_Control (name.c_str ()), + ec_ (ec), + id_ (id) { + } + + virtual bool execute (const char* command) { + if (ACE_OS::strcmp (command, TAO_NS_CONTROL_REMOVE_CONSUMERADMIN) == 0) + { + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + this->ec_->get_consumeradmin (this->id_); + if (!CORBA::is_nil (admin.in ())) + admin->destroy (); + } + else + { + return false; + } + + return true; + } + +private: + TAO_MonitorEventChannel* ec_; + CosNotifyChannelAdmin::AdminID id_; +}; + TAO_MonitorConsumerAdmin::TAO_MonitorConsumerAdmin (void) + : stat_name_ (), + control_name_ (), + queue_size_ (0) { } TAO_MonitorConsumerAdmin::~TAO_MonitorConsumerAdmin (void) { + // First, make sure we can get down to the real ec type + TAO_MonitorEventChannel* ec = + dynamic_cast<TAO_MonitorEventChannel*> (this->ec_.get ()); + if (ec != 0) + { + ec->unregister_statistic (this->stat_name_); + ec->remove_consumeradmin (this->id ()); + TAO_Control_Registry* cinstance = TAO_Control_Registry::instance (); + cinstance->remove (this->control_name_); + } + + // We don't own queue_size_, so we must not delete it +} + +void +TAO_MonitorConsumerAdmin::register_stats_controls ( + TAO_MonitorEventChannel* mec, + const ACE_CString& base) +{ + // Set up the statistic name, create it and register it + this->stat_name_ = base + "/"; + this->stat_name_ += NotifyMonitoringExt::EventChannelQueueSize; + ACE_NEW_THROW_EX (this->queue_size_, + TAO_Statistic (this->stat_name_.c_str (), + TAO_Statistic::TS_NUMBER), + CORBA::NO_MEMORY ()); + if (!mec->register_statistic (this->stat_name_, this->queue_size_)) + { + delete this->queue_size_; + this->queue_size_ = 0; + throw NotifyMonitoringExt::NameAlreadyUsed (); + } + + // If we've successfully registered the statistic, hook us into the + // buffering strategy so it can let us know when the queue changes. + TAO_Notify_ThreadPool_Task* tpt = + dynamic_cast<TAO_Notify_ThreadPool_Task*> (this->get_worker_task ()); + if (tpt != 0) + { + TAO_Notify_Buffering_Strategy* bs = tpt->buffering_strategy (); + bs->set_tracker (this); + } + + this->control_name_ = base; + ConsumerAdminControl* control = 0; + ACE_NEW_THROW_EX (control, + ConsumerAdminControl (mec, + this->control_name_.c_str (), + this->id ()), + CORBA::NO_MEMORY ()); + TAO_Control_Registry* cinstance = TAO_Control_Registry::instance (); + if (!cinstance->add (control)) + { + delete control; + ACE_ERROR ((LM_ERROR, "Unable to add control: %s\n", + this->control_name_.c_str ())); + } } CosNotifyChannelAdmin::ProxySupplier_ptr @@ -59,4 +156,16 @@ TAO_MonitorConsumerAdmin::obtain_notification_push_supplier ( return proxy._retn (); } +void +TAO_MonitorConsumerAdmin::update_queue_count (size_t count) +{ + if (this->queue_size_ != 0) + // The message blocks stored in this queue are of size + // zero. However, each message block is a + // TAO_Notify_Event which has an associated set of data + // which can be used to estimate the amount of memory + // allocated to the message queue + this->queue_size_->receive (count * sizeof (TAO_Notify_Event)); +} + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h index e6485ee5edc..f9791eb781d 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h @@ -9,32 +9,51 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "ace/SString.h" #include "orbsvcs/Notify/ConsumerAdmin.h" +#include "orbsvcs/Notify/Buffering_Strategy.h" #include "orbsvcs/Notify/MonitorControlExt/NotifyMonitoringExtS.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL +class TAO_Statistic; +class TAO_MonitorEventChannel; + class TAO_Notify_MC_Ext_Export TAO_MonitorConsumerAdmin: public virtual TAO_Notify_ConsumerAdmin, - public virtual POA_NotifyMonitoringExt::ConsumerAdmin + public virtual POA_NotifyMonitoringExt::ConsumerAdmin, + public virtual TAO_Notify_Buffering_Strategy::Tracker { public: /// Construct a monitor supplier admin TAO_MonitorConsumerAdmin (void); /// Remove the statistics for this event channel - ~TAO_MonitorConsumerAdmin (void); + virtual ~TAO_MonitorConsumerAdmin (void); + + /// Register our statistic and control objects + void register_stats_controls (TAO_MonitorEventChannel* mec, + const ACE_CString& name); - virtual CosNotifyChannelAdmin::ProxySupplier_ptr + virtual CosNotifyChannelAdmin::ProxySupplier_ptr obtain_named_notification_push_supplier ( CosNotifyChannelAdmin::ClientType ctype, CosNotifyChannelAdmin::ProxyID_out proxy_id, const char * name); - virtual CosNotifyChannelAdmin::ProxySupplier_ptr + virtual CosNotifyChannelAdmin::ProxySupplier_ptr obtain_notification_push_supplier ( CosNotifyChannelAdmin::ClientType ctype, CosNotifyChannelAdmin::ProxyID_out proxy_id); + + /// Receive the queue count and store the approximated queue size in + /// our statistic object. + virtual void update_queue_count (size_t count); + +private: + ACE_CString stat_name_; + ACE_CString control_name_; + TAO_Statistic* queue_size_; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp index 1939fa09da6..f31e205fbba 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.cpp @@ -1,11 +1,13 @@ // $Id$ #include "orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h" -#include "orbsvcs/Notify/MonitorControlExt/NotifyMonitoringExtS.h" +#include "orbsvcs/Notify/MonitorControlExt/MonitorConsumerAdmin.h" +#include "orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h" #include "orbsvcs/Notify/MonitorControl/Statistic_Registry.h" #include "orbsvcs/Notify/MonitorControl/Dynamic_Statistic.h" #include "orbsvcs/Notify/MonitorControl/Control_Registry.h" #include "orbsvcs/Notify/Buffering_Strategy.h" -#include "orbsvcs/Notify/ConsumerAdmin.h" +#include "orbsvcs/Notify/ProxySupplier.h" +#include "orbsvcs/Notify/ProxyConsumer.h" #include "orbsvcs/Notify/ThreadPool_Task.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL @@ -14,49 +16,77 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL // Dynamic Statistic Classes // ****************************************************************** -class EventChannelConsumers: +class EventChannelConsumersSuppliers: public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> { public: - EventChannelConsumers (TAO_MonitorEventChannel* ec, - const ACE_CString& name, - TAO_Statistic::Information_Type type) + EventChannelConsumersSuppliers (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + TAO_Statistic::Information_Type type, + bool is_supplier = false) : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec, name.c_str (), - type) { + type), + is_supplier_ (is_supplier) { } virtual void calculate (void) { if (this->type () == TAO_Statistic::TS_LIST) { TAO_Statistic::List list; - this->interf_->get_consumers (&list); + if (this->is_supplier_) + this->interf_->get_suppliers (&list); + else + this->interf_->get_consumers (&list); this->receive (list); } else - this->receive (static_cast<double> (this->interf_->get_consumers (0))); + { + if (this->is_supplier_) + this->receive (static_cast<double> (this->interf_->get_suppliers (0))); + else + this->receive (static_cast<double> (this->interf_->get_consumers (0))); + } } + +private: + bool is_supplier_; }; -class EventChannelSuppliers: +class EventChannelConsumerSupplierAdmins: public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> { public: - EventChannelSuppliers (TAO_MonitorEventChannel* ec, - const ACE_CString& name, - TAO_Statistic::Information_Type type) + EventChannelConsumerSupplierAdmins (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + TAO_Statistic::Information_Type type, + bool is_supplier = false) : TAO_Dynamic_Statistic<TAO_MonitorEventChannel> (ec, name.c_str (), - type) { + type), + is_supplier_ (is_supplier) { } virtual void calculate (void) { if (this->type () == TAO_Statistic::TS_LIST) { TAO_Statistic::List list; - this->interf_->get_suppliers (&list); + if (this->is_supplier_) + this->interf_->get_supplieradmins (&list); + else + this->interf_->get_consumeradmins (&list); this->receive (list); } else - this->receive (static_cast<double> (this->interf_->get_suppliers (0))); + { + if (this->is_supplier_) + this->receive ( + static_cast<double> (this->interf_->get_supplieradmins (0))); + else + this->receive ( + static_cast<double> (this->interf_->get_consumeradmins (0))); + } } + +private: + bool is_supplier_; }; class QueuedEvents: public TAO_Dynamic_Statistic<TAO_MonitorEventChannel> @@ -123,17 +153,72 @@ public: ec_ (ec) { } - virtual void execute (const char* command) { + virtual bool execute (const char* command) { if (ACE_OS::strcmp (command, TAO_NS_CONTROL_SHUTDOWN) == 0) { this->ec_->destroy (); } + else + { + return false; + } + + return true; } private: TAO_MonitorEventChannel* ec_; }; +class RemoveConsumerSupplierControl: public TAO_NS_Control +{ +public: + RemoveConsumerSupplierControl (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + CosNotifyChannelAdmin::ProxyID id, + bool is_supplier) + : TAO_NS_Control (name.c_str ()), + ec_ (ec), + id_ (id), + is_supplier_ (is_supplier) { + } + + virtual bool execute (const char* command) { + bool status = true; + if (this->is_supplier_) + { + if (ACE_OS::strcmp (command, TAO_NS_CONTROL_REMOVE_SUPPLIER) == 0) + { + status = this->ec_->destroy_supplier (this->id_); + } + else + { + // Indicate that the command was unknown + return false; + } + } + else + { + if (ACE_OS::strcmp (command, TAO_NS_CONTROL_REMOVE_CONSUMER) == 0) + { + status = this->ec_->destroy_consumer (this->id_); + } + else + { + // Indicate that the command was unknown + return false; + } + } + + return status; + } + +private: + TAO_MonitorEventChannel* ec_; + CosNotifyChannelAdmin::ProxyID id_; + bool is_supplier_; +}; + // ****************************************************************** // TAO_MonitorEventChannel Methods // ****************************************************************** @@ -146,6 +231,7 @@ TAO_MonitorEventChannel::TAO_MonitorEventChannel (const char* name) TAO_MonitorEventChannel::~TAO_MonitorEventChannel (void) { + ACE_GUARD (ACE_SYNCH_MUTEX, guard, this->names_mutex_); TAO_Statistic_Registry* instance = TAO_Statistic_Registry::instance (); size_t size = this->stat_names_.size (); @@ -169,6 +255,47 @@ TAO_MonitorEventChannel::name (void) const return this->name_; } +bool +TAO_MonitorEventChannel::register_statistic( + const ACE_CString& name, + TAO_Statistic* stat, + TAO_Statistic_Registry* instance) +{ + // Allow the caller to provide the registry + if (instance == 0) + instance = TAO_Statistic_Registry::instance (); + + // Add the statistic to the registry and if it's successful + // add the name to the statistic names list. + bool added = instance->add (stat); + if (added) + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, this->names_mutex_, added); + this->stat_names_.push_back (name); + } + return added; +} + +bool +TAO_MonitorEventChannel::unregister_statistic( + const ACE_CString& name, + TAO_Statistic_Registry* instance) +{ + // Allow the caller to provide the registry + if (instance == 0) + instance = TAO_Statistic_Registry::instance (); + + // Remove the statistic from the registry and if it's successful + // remove the name from the statistic names list. + bool removed = instance->remove (name); + if (removed) + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, this->names_mutex_, removed); + this->remove_list_name(this->stat_names_, name); + } + return removed; +} + void TAO_MonitorEventChannel::map_supplier_proxy ( CosNotifyChannelAdmin::ProxyID id, @@ -177,13 +304,35 @@ TAO_MonitorEventChannel::map_supplier_proxy ( if (name.length () == 0) throw NotifyMonitoringExt::NameMapError (); + ACE_CString full = this->name_ + "/" + name; ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->supplier_mutex_); - if (this->is_duplicate_name (this->supplier_map_, name)) + if (this->is_duplicate_name (this->supplier_map_, full)) throw NotifyMonitoringExt::NameAlreadyUsed (); - int status = this->supplier_map_.bind (id, name); - if (status != 0) + if (this->supplier_map_.bind (id, full) != 0) throw NotifyMonitoringExt::NameMapError (); + + // The view from the outside sees a supplier proxy as a consumer. + // So, we'll register a RemoveConsumerSupplierControl for each supplier + // proxy indicating a consumer (false) + RemoveConsumerSupplierControl* rcsc = 0; + ACE_NEW_THROW_EX (rcsc, + RemoveConsumerSupplierControl (this, full, id, false), + CORBA::NO_MEMORY ()); + + TAO_Control_Registry* cinstance = + TAO_Control_Registry::instance (); + if (cinstance->add (rcsc)) + { + ACE_GUARD (ACE_SYNCH_MUTEX, guard, this->names_mutex_); + this->control_names_.push_back (full); + } + else + { + delete rcsc; + ACE_ERROR ((LM_ERROR, "Unable to add control: %s\n", + full.c_str ())); + } } void @@ -194,43 +343,86 @@ TAO_MonitorEventChannel::map_consumer_proxy ( if (name.length () == 0) throw NotifyMonitoringExt::NameMapError (); + ACE_CString full = this->name_ + "/" + name; ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->consumer_mutex_); - if (this->is_duplicate_name (this->consumer_map_, name)) + if (this->is_duplicate_name (this->consumer_map_, full)) throw NotifyMonitoringExt::NameAlreadyUsed (); - int status = this->consumer_map_.bind (id, name); - if (status != 0) + if (this->consumer_map_.bind (id, full) != 0) throw NotifyMonitoringExt::NameMapError (); + + // The view from the outside sees a consumer proxy as a supplier. + // So, we'll register a RemoveConsumerSupplierControl for each consumer + // proxy indicating a supplier (true) + RemoveConsumerSupplierControl* rcsc = 0; + ACE_NEW_THROW_EX (rcsc, + RemoveConsumerSupplierControl (this, full, id, true), + CORBA::NO_MEMORY ()); + + TAO_Control_Registry* cinstance = + TAO_Control_Registry::instance (); + if (cinstance->add (rcsc)) + { + ACE_GUARD (ACE_SYNCH_MUTEX, guard, this->names_mutex_); + this->control_names_.push_back (full); + } + else + { + delete rcsc; + ACE_ERROR ((LM_ERROR, "Unable to add control: %s\n", + full.c_str ())); + } } void -TAO_MonitorEventChannel::cleanup_proxy (CosNotifyChannelAdmin::ProxyID id - , bool is_supplier) +TAO_MonitorEventChannel::cleanup_proxy (CosNotifyChannelAdmin::ProxyID id, + bool is_supplier) { - if (is_supplier) { - this->unmap_supplier_proxy (id); - } - else { - this->unmap_consumer_proxy (id); - } + ACE_CString name; + if (is_supplier) + { + ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->supplier_mutex_); + // It may seem like a good idea to throw a + // NotifyMonitoringExt::NameMapError() exception if we can't unbind the + // id from the map, but we can't. This method is called indirectly + // from CORBA calls (e.g., disconnect_structured_push_supplier) and at + // best it will cause a CORBA::UNKNOWN exception and at worst will + // cause an unexpected exception call which will cause the program to + // abort. + this->supplier_map_.unbind (id, name); + } + else + { + ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->consumer_mutex_); + // The same goes for this one too. + this->consumer_map_.unbind (id, name); + } + + if (name.length () != 0) + { + TAO_Control_Registry* cinstance = + TAO_Control_Registry::instance (); + cinstance->remove(name); + + ACE_GUARD (ACE_SYNCH_MUTEX, guard, this->names_mutex_); + this->remove_list_name (this->control_names_, name); + } } void -TAO_MonitorEventChannel::unmap_supplier_proxy -(CosNotifyChannelAdmin::ProxyID id) +TAO_MonitorEventChannel::remove_consumeradmin ( + CosNotifyChannelAdmin::ProxyID id) { - if (this->supplier_map_.unbind (id) != 0) { - throw NotifyMonitoringExt::NameMapError (); - } + ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->consumeradmin_mutex_); + this->consumeradmin_map_.unbind (id); } void -TAO_MonitorEventChannel::unmap_consumer_proxy -(CosNotifyChannelAdmin::ProxyID id) +TAO_MonitorEventChannel::remove_supplieradmin ( + CosNotifyChannelAdmin::ProxyID id) { - if (this->consumer_map_.unbind (id) != 0) { - throw NotifyMonitoringExt::NameMapError (); - } + ACE_WRITE_GUARD (ACE_SYNCH_RW_MUTEX, guard, this->supplieradmin_mutex_); + this->supplieradmin_map_.unbind (id); } void @@ -254,10 +446,7 @@ TAO_MonitorEventChannel::add_stats (const char* name) CORBA::NO_MEMORY ()); ACE_Time_Value tv (ACE_OS::gettimeofday()); timestamp->receive (tv.sec () + (tv.usec () / 1000000.0)); - bool added = instance->add (timestamp); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, timestamp, instance)) { delete timestamp; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", @@ -266,16 +455,14 @@ TAO_MonitorEventChannel::add_stats (const char* name) stat_name = dir_name + NotifyMonitoringExt::EventChannelConsumerCount; - EventChannelConsumers* consumers = 0; + EventChannelConsumersSuppliers* consumers = 0; ACE_NEW_THROW_EX (consumers, - EventChannelConsumers (this, + EventChannelConsumersSuppliers ( + this, stat_name.c_str (), TAO_Statistic::TS_NUMBER), CORBA::NO_MEMORY ()); - added = instance->add (consumers); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, consumers, instance)) { delete consumers; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", @@ -286,14 +473,12 @@ TAO_MonitorEventChannel::add_stats (const char* name) NotifyMonitoringExt::EventChannelConsumerNames; consumers = 0; ACE_NEW_THROW_EX (consumers, - EventChannelConsumers (this, + EventChannelConsumersSuppliers ( + this, stat_name.c_str (), TAO_Statistic::TS_LIST), CORBA::NO_MEMORY ()); - added = instance->add (consumers); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, consumers, instance)) { delete consumers; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", @@ -302,16 +487,15 @@ TAO_MonitorEventChannel::add_stats (const char* name) stat_name = dir_name + NotifyMonitoringExt::EventChannelSupplierCount; - EventChannelSuppliers* suppliers = 0; + EventChannelConsumersSuppliers* suppliers = 0; ACE_NEW_THROW_EX (suppliers, - EventChannelSuppliers (this, + EventChannelConsumersSuppliers ( + this, stat_name.c_str (), - TAO_Statistic::TS_NUMBER), + TAO_Statistic::TS_NUMBER, + true), CORBA::NO_MEMORY ()); - added = instance->add (suppliers); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, suppliers, instance)) { delete suppliers; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", @@ -322,30 +506,92 @@ TAO_MonitorEventChannel::add_stats (const char* name) NotifyMonitoringExt::EventChannelSupplierNames; suppliers = 0; ACE_NEW_THROW_EX (suppliers, - EventChannelSuppliers (this, + EventChannelConsumersSuppliers ( + this, stat_name.c_str (), - TAO_Statistic::TS_LIST), + TAO_Statistic::TS_LIST, + true), CORBA::NO_MEMORY ()); - added = instance->add (suppliers); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, suppliers, instance)) { delete suppliers; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", stat_name.c_str ())); } + stat_name = dir_name + + NotifyMonitoringExt::EventChannelConsumerAdminCount; + EventChannelConsumerSupplierAdmins* conadmins = 0; + ACE_NEW_THROW_EX (conadmins, + EventChannelConsumerSupplierAdmins ( + this, + stat_name.c_str (), + TAO_Statistic::TS_NUMBER), + CORBA::NO_MEMORY ()); + if (!this->register_statistic (stat_name, conadmins, instance)) + { + delete conadmins; + ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", + stat_name.c_str ())); + } + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelConsumerAdminNames; + conadmins = 0; + ACE_NEW_THROW_EX (conadmins, + EventChannelConsumerSupplierAdmins ( + this, + stat_name.c_str (), + TAO_Statistic::TS_LIST), + CORBA::NO_MEMORY ()); + if (!this->register_statistic (stat_name, conadmins, instance)) + { + delete conadmins; + ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", + stat_name.c_str ())); + } + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelSupplierAdminCount; + EventChannelConsumerSupplierAdmins* supadmins = 0; + ACE_NEW_THROW_EX (supadmins, + EventChannelConsumerSupplierAdmins ( + this, + stat_name.c_str (), + TAO_Statistic::TS_NUMBER, + true), + CORBA::NO_MEMORY ()); + if (!this->register_statistic (stat_name, supadmins, instance)) + { + delete supadmins; + ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", + stat_name.c_str ())); + } + + stat_name = dir_name + + NotifyMonitoringExt::EventChannelSupplierAdminNames; + supadmins = 0; + ACE_NEW_THROW_EX (supadmins, + EventChannelConsumerSupplierAdmins ( + this, + stat_name.c_str (), + TAO_Statistic::TS_LIST, + true), + CORBA::NO_MEMORY ()); + if (!this->register_statistic (stat_name, supadmins, instance)) + { + delete supadmins; + ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", + stat_name.c_str ())); + } + stat_name = dir_name + NotifyMonitoringExt::EventChannelQueueSize; QueuedEvents* events = 0; ACE_NEW_THROW_EX (events, QueuedEvents (this, stat_name.c_str (), TAO_Statistic::TS_NUMBER, false), CORBA::NO_MEMORY ()); - added = instance->add (events); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, events, instance)) { delete events; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", @@ -359,10 +605,7 @@ TAO_MonitorEventChannel::add_stats (const char* name) QueuedEvents (this, stat_name.c_str (), TAO_Statistic::TS_NUMBER, true), CORBA::NO_MEMORY ()); - added = instance->add (events); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, events, instance)) { delete events; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", @@ -375,10 +618,7 @@ TAO_MonitorEventChannel::add_stats (const char* name) OldestEvent (this, stat_name.c_str (), TAO_Statistic::TS_TIME), CORBA::NO_MEMORY ()); - added = instance->add (oldest); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, oldest, instance)) { delete oldest; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", @@ -392,10 +632,7 @@ TAO_MonitorEventChannel::add_stats (const char* name) SlowestConsumers (this, stat_name.c_str (), TAO_Statistic::TS_LIST), CORBA::NO_MEMORY ()); - added = instance->add (slowest); - if (added) - this->stat_names_.push_back (stat_name); - else + if (!this->register_statistic (stat_name, slowest, instance)) { delete oldest; ACE_ERROR ((LM_ERROR, "Unable to add statistic: %s\n", @@ -410,9 +647,11 @@ TAO_MonitorEventChannel::add_stats (const char* name) ShutdownControl (this, this->name_), CORBA::NO_MEMORY ()); - added = cinstance->add (sd); - if (added) - this->control_names_.push_back (this->name_); + if (cinstance->add (sd)) + { + ACE_GUARD (ACE_SYNCH_MUTEX, guard, this->names_mutex_); + this->control_names_.push_back (this->name_); + } else { delete sd; @@ -422,6 +661,118 @@ TAO_MonitorEventChannel::add_stats (const char* name) } } +CosNotifyChannelAdmin::ConsumerAdmin_ptr +TAO_MonitorEventChannel::new_for_consumers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id) +{ + return this->named_new_for_consumers (op, id, 0); +} + +CosNotifyChannelAdmin::ConsumerAdmin_ptr +TAO_MonitorEventChannel::named_new_for_consumers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id, + const char* name) +{ + if (name != 0 && name[0] == 0) + throw NotifyMonitoringExt::NameMapError (); + + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + this->TAO_Notify_EventChannel::new_for_consumers (op, id); + + // WARNING: Internal knowledge of TAO + TAO_MonitorConsumerAdmin* low_admin = + dynamic_cast<TAO_MonitorConsumerAdmin*> (admin->_servant ()); + if (low_admin == 0) + // This shouldn't happen + throw CORBA::INTERNAL (); + else + { + // Build up the full name + ACE_CString full = this->name_ + "/"; + if (name == 0) + { + char idname[64]; + ACE_OS::sprintf(idname, "%d", id); + full += idname; + } + else + { + full += name; + } + + ACE_WRITE_GUARD_RETURN (ACE_SYNCH_RW_MUTEX, guard, + this->consumeradmin_mutex_, + CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); + if (this->is_duplicate_name (this->consumeradmin_map_, full)) + throw NotifyMonitoringExt::NameAlreadyUsed (); + + if (this->consumeradmin_map_.bind (id, full) != 0) + throw NotifyMonitoringExt::NameMapError (); + + low_admin->register_stats_controls (this, full); + } + + return admin._retn (); +} + +CosNotifyChannelAdmin::SupplierAdmin_ptr +TAO_MonitorEventChannel::new_for_suppliers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id) +{ + return this->named_new_for_suppliers (op, id, 0); +} + +CosNotifyChannelAdmin::SupplierAdmin_ptr +TAO_MonitorEventChannel::named_new_for_suppliers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id, + const char* name) +{ + if (name != 0 && name[0] == 0) + throw NotifyMonitoringExt::NameMapError (); + + CosNotifyChannelAdmin::SupplierAdmin_var admin = + this->TAO_Notify_EventChannel::new_for_suppliers (op, id); + + // WARNING: Internal knowledge of TAO + TAO_MonitorSupplierAdmin* low_admin = + dynamic_cast<TAO_MonitorSupplierAdmin*> (admin->_servant ()); + if (low_admin == 0) + // This shouldn't happen + throw CORBA::INTERNAL (); + else + { + // Build up the full name + ACE_CString full = this->name_ + "/"; + if (name == 0) + { + char idname[64]; + ACE_OS::sprintf(idname, "%d", id); + full += idname; + } + else + { + full += name; + } + + ACE_WRITE_GUARD_RETURN (ACE_SYNCH_RW_MUTEX, guard, + this->supplieradmin_mutex_, + CosNotifyChannelAdmin::SupplierAdmin::_nil ()); + if (this->is_duplicate_name (this->supplieradmin_map_, full)) + throw NotifyMonitoringExt::NameAlreadyUsed (); + + if (this->supplieradmin_map_.bind (id, full) != 0) + throw NotifyMonitoringExt::NameMapError (); + + low_admin->register_stats_controls (this, full); + } + + return admin._retn (); +} + size_t TAO_MonitorEventChannel::get_consumers (TAO_Statistic::List* names) { @@ -510,6 +861,56 @@ TAO_MonitorEventChannel::get_suppliers (TAO_Statistic::List* names) return count; } +size_t +TAO_MonitorEventChannel::get_consumeradmins (TAO_Statistic::List* names) +{ + ACE_READ_GUARD_RETURN (ACE_SYNCH_RW_MUTEX, guard, + this->consumeradmin_mutex_, 0); + + CosNotifyChannelAdmin::AdminIDSeq_var admin_ids = + this->get_all_consumeradmins (); + return this->get_admins (this->consumeradmin_map_, admin_ids.in (), names); +} + +size_t +TAO_MonitorEventChannel::get_supplieradmins (TAO_Statistic::List* names) +{ + ACE_READ_GUARD_RETURN (ACE_SYNCH_RW_MUTEX, guard, + this->supplieradmin_mutex_, 0); + + CosNotifyChannelAdmin::AdminIDSeq_var admin_ids = + this->get_all_supplieradmins (); + return this->get_admins (this->supplieradmin_map_, admin_ids.in (), names); +} + +size_t +TAO_MonitorEventChannel::get_admins ( + TAO_MonitorEventChannel::Map& map, + const CosNotifyChannelAdmin::AdminIDSeq& admin_ids, + TAO_Statistic::List* names) +{ + size_t count = 0; + CORBA::ULong length = admin_ids.length (); + for(CORBA::ULong j = 0; j < length; j++) + { + if (names == 0) + { + if (map.find (admin_ids[j]) == 0) + count++; + } + else + { + ACE_CString name; + if (map.find (admin_ids[j], name) == 0) + { + count++; + names->push_back (name); + } + } + } + return count; +} + TAO_Notify_ThreadPool_Task* TAO_MonitorEventChannel::get_threadpool_task ( CosNotifyChannelAdmin::AdminID id) @@ -618,10 +1019,19 @@ TAO_MonitorEventChannel::determine_slowest_consumer ( double TAO_MonitorEventChannel::get_oldest_event (void) { - ACE_Time_Value tv (ACE_Time_Value::max_time); CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids = this->get_all_consumeradmins (); CORBA::ULong length = conadmin_ids->length (); + + // If there are no consumers, we will return the special case indicator + if (length == 0) + { + return 0.0; + } + + // Loop through each consumer admins thread pool task and have the + // buffering strategy give us the time of the oldest event. + ACE_Time_Value tv (ACE_Time_Value::max_time); for(CORBA::ULong j = 0; j < length; j++) { TAO_Notify_ThreadPool_Task* task = @@ -635,6 +1045,12 @@ TAO_MonitorEventChannel::get_oldest_event (void) } } + // If there was no message in any of the queues, our time value will be + // ACE_Time_Value::max_time. For this situation, we must return the + // special case indicator. + if (tv == ACE_Time_Value::max_time) + return 0.0; + return tv.sec () + (tv.usec () / 1000000.0); } @@ -656,4 +1072,111 @@ TAO_MonitorEventChannel::is_duplicate_name ( return false; } +void +TAO_MonitorEventChannel::remove_list_name ( + TAO_MonitorEventChannel::NameList& list, + const ACE_CString& name) +{ + size_t size = list.size (); + for(size_t i = 0; i < size; i++) + { + if (list[i] == name) + { + if (size == 1) + { + list.clear (); + } + else + { + if (i != size - 1) + { + list[i] = list[size - 1]; + } + list.resize (size - 1, ""); + } + break; + } + } +} + +bool +TAO_MonitorEventChannel::destroy_consumer ( + CosNotifyChannelAdmin::ProxyID id) +{ + // Since we just have a proxy id, we have to search through all of the + // consumer admins. We have no idea to which the proxy will belong. + CosNotifyChannelAdmin::AdminIDSeq_var conadmin_ids = + this->get_all_consumeradmins (); + CORBA::ULong length = conadmin_ids->length (); + for(CORBA::ULong j = 0; j < length; j++) + { + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + this->get_consumeradmin (conadmin_ids[j]); + if (!CORBA::is_nil (admin.in ())) + { + try + { + CosNotifyChannelAdmin::ProxySupplier_var supplier = + admin->get_proxy_supplier(id); + if (!CORBA::is_nil(supplier.in())) + { + // WARNING: Internal knowledge of TAO and the + // Notification Service implementation. + TAO_Notify_ProxySupplier* low_proxy = + dynamic_cast<TAO_Notify_ProxySupplier*> ( + supplier->_servant ()); + + low_proxy->destroy (); + return true; + } + } + catch(const CosNotifyChannelAdmin::ProxyNotFound&) + { + } + } + } + + return false; +} + +bool +TAO_MonitorEventChannel::destroy_supplier ( + CosNotifyChannelAdmin::ProxyID id) +{ + // Since we just have a proxy id, we have to search through all of the + // supplier admins. We have no idea to which the proxy will belong. + CosNotifyChannelAdmin::AdminIDSeq_var supadmin_ids = + this->get_all_supplieradmins (); + CORBA::ULong length = supadmin_ids->length (); + for(CORBA::ULong j = 0; j < length; j++) + { + CosNotifyChannelAdmin::SupplierAdmin_var admin = + this->get_supplieradmin (supadmin_ids[j]); + if (!CORBA::is_nil (admin.in ())) + { + try + { + CosNotifyChannelAdmin::ProxyConsumer_var consumer = + admin->get_proxy_consumer(id); + if (!CORBA::is_nil(consumer.in())) + { + // WARNING: Internal knowledge of TAO and the + // Notification Service implementation. + TAO_Notify_ProxyConsumer* low_proxy = + dynamic_cast<TAO_Notify_ProxyConsumer*> ( + consumer->_servant ()); + + low_proxy->destroy(); + return true; + } + } + catch(const CosNotifyChannelAdmin::ProxyNotFound&) + { + } + } + } + + return false; +} + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h index 066c5fca625..d83db0a3be1 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h @@ -9,6 +9,7 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "orbsvcs/Notify/MonitorControlExt/NotifyMonitoringExtS.h" #include "orbsvcs/Notify/MonitorControl/Statistic.h" #include "orbsvcs/Notify/EventChannel.h" #include "ace/Vector_T.h" @@ -16,9 +17,11 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL class TAO_Notify_ThreadPool_Task; +class TAO_Statistic_Registry; class TAO_Notify_MC_Ext_Export TAO_MonitorEventChannel: - public TAO_Notify_EventChannel + public virtual TAO_Notify_EventChannel, + public virtual POA_NotifyMonitoringExt::EventChannel { public: /// Construct a named event channel and associate various @@ -31,53 +34,109 @@ public: /// Return the name of this event channel const ACE_CString& name (void) const; + /// Register a single statistic with the EC and statistic registry + bool register_statistic (const ACE_CString& name, + TAO_Statistic* stat, + TAO_Statistic_Registry* instance = 0); + + /// Unregister a single statistic from the EC and statistic registry + bool unregister_statistic (const ACE_CString& name, + TAO_Statistic_Registry* instance = 0); + + /// Keep track of the supplier name associated with the proxy id void map_supplier_proxy (CosNotifyChannelAdmin::ProxyID id, const ACE_CString& name); + + /// Keep track of the consumer name associated with the proxy id void map_consumer_proxy (CosNotifyChannelAdmin::ProxyID id, const ACE_CString& name); - virtual void cleanup_proxy (CosNotifyChannelAdmin::ProxyID id - , bool is_supplier); + /// Remove the supplier/consumer proxy id/name mapping + virtual void cleanup_proxy (CosNotifyChannelAdmin::ProxyID id, + bool is_supplier); + + /// Remove the consumer admin id/name mapping + virtual void remove_consumeradmin (CosNotifyChannelAdmin::AdminID id); + + /// Remove the supplier admin id/name mapping + virtual void remove_supplieradmin (CosNotifyChannelAdmin::AdminID id); /// Add the statistics for this event channel. If the ec was /// constructed with a non-zero length string, this method will /// do nothing. void add_stats (const char* name = 0); + virtual CosNotifyChannelAdmin::ConsumerAdmin_ptr + new_for_consumers (CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id); + + virtual CosNotifyChannelAdmin::ConsumerAdmin_ptr + named_new_for_consumers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id, + const char* name); + + virtual CosNotifyChannelAdmin::SupplierAdmin_ptr + new_for_suppliers (CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id); + + virtual CosNotifyChannelAdmin::SupplierAdmin_ptr + named_new_for_suppliers ( + CosNotifyChannelAdmin::InterFilterGroupOperator op, + CosNotifyChannelAdmin::AdminID_out id, + const char* name); + private: size_t get_consumers (TAO_Statistic::List* names); size_t get_suppliers (TAO_Statistic::List* names); + size_t get_consumeradmins (TAO_Statistic::List* names); + size_t get_supplieradmins (TAO_Statistic::List* names); TAO_Notify_ThreadPool_Task* get_threadpool_task ( CosNotifyChannelAdmin::AdminID id); size_t calculate_queue_size (bool count); double get_oldest_event (void); void determine_slowest_consumer (TAO_Statistic::List* names); + bool destroy_consumer (CosNotifyChannelAdmin::ProxyID id); + bool destroy_supplier (CosNotifyChannelAdmin::ProxyID id); - void unmap_supplier_proxy (CosNotifyChannelAdmin::ProxyID id); - void unmap_consumer_proxy (CosNotifyChannelAdmin::ProxyID id); - friend class EventChannelConsumers; - friend class EventChannelSuppliers; + friend class RemoveConsumerSupplierControl; + friend class EventChannelConsumersSuppliers; + friend class EventChannelConsumerSupplierAdmins; friend class SlowestConsumers; friend class QueuedEvents; friend class OldestEvent; - typedef ACE_Hash_Map_Manager<CosNotifyChannelAdmin::ProxyID, + typedef ACE_Vector<ACE_CString> NameList; + typedef ACE_Hash_Map_Manager<CORBA::Long, ACE_CString, ACE_SYNCH_NULL_MUTEX> Map; + size_t get_admins (Map& map, + const CosNotifyChannelAdmin::AdminIDSeq& ids, + TAO_Statistic::List* names); bool is_duplicate_name (const Map& map, const ACE_CString& name) const; + void remove_list_name (NameList& list, const ACE_CString& name); + ACE_CString name_; - ACE_Vector<ACE_CString> stat_names_; - ACE_Vector<ACE_CString> control_names_; + + mutable ACE_SYNCH_MUTEX names_mutex_; + NameList stat_names_; + NameList control_names_; mutable ACE_SYNCH_RW_MUTEX supplier_mutex_; Map supplier_map_; mutable ACE_SYNCH_RW_MUTEX consumer_mutex_; Map consumer_map_; + + mutable ACE_SYNCH_RW_MUTEX supplieradmin_mutex_; + Map supplieradmin_map_; + + mutable ACE_SYNCH_RW_MUTEX consumeradmin_mutex_; + Map consumeradmin_map_; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannelFactory.h b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannelFactory.h index 77ff8d7965e..11ae9ff8227 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannelFactory.h +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorEventChannelFactory.h @@ -43,6 +43,8 @@ public: const CosNotification::AdminProperties& initial_admin, CosNotifyChannelAdmin::ChannelID_out id); + /// Hook into the remove() call from the base class and remove it from + /// our map before passing control back to the base. virtual void remove (TAO_Notify_EventChannel* channel); // This is public to allow the Unbinder class access diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.cpp b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.cpp index aceb58c35e3..46d098e3737 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.cpp @@ -1,15 +1,79 @@ // $Id$ #include "orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h" #include "orbsvcs/Notify/MonitorControlExt/MonitorEventChannel.h" +#include "orbsvcs/Notify/MonitorControl/Control_Registry.h" +#include "orbsvcs/Notify/MonitorControl/Control.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL +class SupplierAdminControl: public TAO_NS_Control +{ +public: + SupplierAdminControl (TAO_MonitorEventChannel* ec, + const ACE_CString& name, + CosNotifyChannelAdmin::AdminID id) + : TAO_NS_Control (name.c_str ()), + ec_ (ec), + id_ (id) { + } + + virtual bool execute (const char* command) { + if (ACE_OS::strcmp (command, TAO_NS_CONTROL_REMOVE_SUPPLIERADMIN) == 0) + { + CosNotifyChannelAdmin::SupplierAdmin_var admin = + this->ec_->get_supplieradmin (this->id_); + if (!CORBA::is_nil (admin.in ())) + admin->destroy (); + } + else + { + return false; + } + + return true; + } + +private: + TAO_MonitorEventChannel* ec_; + CosNotifyChannelAdmin::AdminID id_; +}; + TAO_MonitorSupplierAdmin::TAO_MonitorSupplierAdmin (void) { } TAO_MonitorSupplierAdmin::~TAO_MonitorSupplierAdmin (void) { + // First, make sure we can get down to the real ec type + TAO_MonitorEventChannel* ec = + dynamic_cast<TAO_MonitorEventChannel*> (this->ec_.get ()); + if (ec != 0) + { + ec->remove_supplieradmin (this->id ()); + TAO_Control_Registry* cinstance = TAO_Control_Registry::instance (); + cinstance->remove (this->control_name_); + } +} + +void +TAO_MonitorSupplierAdmin::register_stats_controls ( + TAO_MonitorEventChannel* mec, + const ACE_CString& base) +{ + this->control_name_ = base; + SupplierAdminControl* control = 0; + ACE_NEW_THROW_EX (control, + SupplierAdminControl (mec, + this->control_name_.c_str (), + this->id ()), + CORBA::NO_MEMORY ()); + TAO_Control_Registry* cinstance = TAO_Control_Registry::instance (); + if (!cinstance->add (control)) + { + delete control; + ACE_ERROR ((LM_ERROR, "Unable to add control: %s\n", + this->control_name_.c_str ())); + } } CosNotifyChannelAdmin::ProxyConsumer_ptr diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h index ea8419cf83d..ca4407c8a96 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/MonitorSupplierAdmin.h @@ -14,6 +14,8 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL +class TAO_MonitorEventChannel; + class TAO_Notify_MC_Ext_Export TAO_MonitorSupplierAdmin: public virtual TAO_Notify_SupplierAdmin, public virtual POA_NotifyMonitoringExt::SupplierAdmin @@ -25,6 +27,10 @@ public: /// Remove the statistics for this event channel ~TAO_MonitorSupplierAdmin (void); + /// Register our statistic and control objects + void register_stats_controls (TAO_MonitorEventChannel* mec, + const ACE_CString& name); + virtual CosNotifyChannelAdmin::ProxyConsumer_ptr obtain_named_notification_push_consumer ( CosNotifyChannelAdmin::ClientType ctype, @@ -35,6 +41,9 @@ public: obtain_notification_push_consumer ( CosNotifyChannelAdmin::ClientType ctype, CosNotifyChannelAdmin::ProxyID_out proxy_id); + +private: + ACE_CString control_name_; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/NotifyMonitoringExt.idl b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/NotifyMonitoringExt.idl index ae76385bf89..cc3c37acc04 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/NotifyMonitoringExt.idl +++ b/TAO/orbsvcs/orbsvcs/Notify/MonitorControlExt/NotifyMonitoringExt.idl @@ -44,6 +44,18 @@ module NotifyMonitoringExt /// This corresponds to the EventChannel supplier names const string EventChannelSupplierNames = "SupplierNames"; + /// This corresponds to the EventChannel consumer admin count + const string EventChannelConsumerAdminCount = "ConsumerAdminCount"; + + /// This corresponds to the EventChannel supplier admin count + const string EventChannelSupplierAdminCount = "SupplierAdminCount"; + + /// This corresponds to the EventChannel consumer admin names + const string EventChannelConsumerAdminNames = "ConsumerAdminNames"; + + /// This corresponds to the EventChannel supplier admin names + const string EventChannelSupplierAdminNames = "SupplierAdminNames"; + /// This corresponds to the EventChannel queue size const string EventChannelQueueSize = "QueueSize"; @@ -86,6 +98,25 @@ module NotifyMonitoringExt NameMapError); }; + interface EventChannel: CosNotifyChannelAdmin::EventChannel + { + CosNotifyChannelAdmin::ConsumerAdmin + named_new_for_consumers( + in CosNotifyChannelAdmin::InterFilterGroupOperator op, + out CosNotifyChannelAdmin::AdminID id, + in string name) + raises (NameAlreadyUsed, + NameMapError); + + CosNotifyChannelAdmin::SupplierAdmin + named_new_for_suppliers( + in CosNotifyChannelAdmin::InterFilterGroupOperator op, + out CosNotifyChannelAdmin::AdminID id, + in string name) + raises (NameAlreadyUsed, + NameMapError); + }; + interface EventChannelFactory: NotifyExt::EventChannelFactory { CosNotifyChannelAdmin::EventChannel diff --git a/TAO/orbsvcs/tests/Notify/MC/test_monitor.cpp b/TAO/orbsvcs/tests/Notify/MC/test_monitor.cpp index 905df5fdc20..5e1d10ac76a 100644 --- a/TAO/orbsvcs/tests/Notify/MC/test_monitor.cpp +++ b/TAO/orbsvcs/tests/Notify/MC/test_monitor.cpp @@ -75,6 +75,13 @@ MonitorTestInterface_i::running (MonitorTestInterface::Which proc) if (num.last != 1) ACE_ERROR ((LM_ERROR, "ERROR: There should be only one Consumer\n")); + str = this->base_ + NotifyMonitoringExt::EventChannelConsumerAdminCount; + data = nsm_->get_statistic(str.c_str ()); + num = data->num (); + if (num.last != 1) + ACE_ERROR ((LM_ERROR, + "ERROR: There should be only one ConsumerAdmin\n")); + str = this->base_ + NotifyMonitoringExt::EventChannelQueueElementCount; data = nsm_->get_statistic(str.c_str ()); num = data->num (); @@ -87,6 +94,13 @@ MonitorTestInterface_i::running (MonitorTestInterface::Which proc) num = data->num (); if (num.last != 1) ACE_ERROR ((LM_ERROR, "ERROR: There should be only one Supplier\n")); + + str = this->base_ + NotifyMonitoringExt::EventChannelSupplierAdminCount; + data = nsm_->get_statistic(str.c_str ()); + num = data->num (); + if (num.last != 1) + ACE_ERROR ((LM_ERROR, + "ERROR: There should be only one SupplierAdmin\n")); break; default: ACE_ERROR ((LM_ERROR, "ERROR: Impossible enum value %d\n", proc)); @@ -98,6 +112,7 @@ MonitorTestInterface_i::finished (MonitorTestInterface::Which proc) { ACE_CString str; CosNotification::NotificationServiceMonitorControl::Data_var data; + CosNotification::NotificationServiceMonitorControl::NameList list; CosNotification::NotificationServiceMonitorControl::Numeric num; switch(proc) @@ -123,6 +138,22 @@ MonitorTestInterface_i::finished (MonitorTestInterface::Which proc) if (num.last == 0) ACE_ERROR ((LM_ERROR, "ERROR: There should be at least one " "event queued\n")); + + str = this->base_ + NotifyMonitoringExt::EventChannelConsumerAdminNames; + data = nsm_->get_statistic(str.c_str ()); + list = data->list (); + for (CORBA::ULong i = 0; i < list.length (); i++) + { + str = list[i].in (); + str += "/"; + str += NotifyMonitoringExt::EventChannelQueueSize; + data = nsm_->get_statistic(str.c_str ()); + num = data->num (); + ACE_DEBUG ((LM_DEBUG, "Average Queue Size: %f\n", num.average)); + if (num.average == 0.0) + ACE_ERROR ((LM_ERROR, "ERROR: The average should be non-zero\n")); + } + break; default: ACE_ERROR ((LM_ERROR, "ERROR: Impossible enum value %d\n", proc)); diff --git a/TAO/orbsvcs/tests/unit/Notify/MC/Control/Control.cpp b/TAO/orbsvcs/tests/unit/Notify/MC/Control/Control.cpp index 04e35369608..82027fa3eb5 100644 --- a/TAO/orbsvcs/tests/unit/Notify/MC/Control/Control.cpp +++ b/TAO/orbsvcs/tests/unit/Notify/MC/Control/Control.cpp @@ -11,8 +11,14 @@ public: : TAO_NS_Control (name.c_str ()) { } - virtual void execute (const char* cmd) { + virtual bool execute (const char* cmd) { command = cmd; + if (ACE_OS::strcmp (cmd, TAO_NS_CONTROL_SHUTDOWN) != 0) + { + return false; + } + + return true; } }; @@ -34,7 +40,12 @@ ACE_TMAIN (int, ACE_TCHAR*[]) if (name != ct.name ()) error ("The TAO_NS_Control name does not work"); - ct.execute (TAO_NS_CONTROL_SHUTDOWN); + if (ct.execute (TAO_NS_CONTROL_REMOVE_CONSUMER)) + error ("The control object should have returned false"); + + if (!ct.execute (TAO_NS_CONTROL_SHUTDOWN)) + error ("The control object shouldn't have returned false"); + if (command != TAO_NS_CONTROL_SHUTDOWN) error ("The TAO_NS_Control callback does not work"); } diff --git a/TAO/orbsvcs/tests/unit/Notify/MC/MonitorControlExt/MonitorControlExt.cpp b/TAO/orbsvcs/tests/unit/Notify/MC/MonitorControlExt/MonitorControlExt.cpp index c7366e24f73..c242879d0b6 100644 --- a/TAO/orbsvcs/tests/unit/Notify/MC/MonitorControlExt/MonitorControlExt.cpp +++ b/TAO/orbsvcs/tests/unit/Notify/MC/MonitorControlExt/MonitorControlExt.cpp @@ -50,6 +50,10 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) CosNotifyChannelAdmin::EventChannel_var ec = monitor_ec_factory->create_named_channel (qos_prop, admin_prop, id, ec_name.c_str ()); + NotifyMonitoringExt::EventChannel_var mec = + NotifyMonitoringExt::EventChannel::_narrow (ec); + if (CORBA::is_nil (mec.in ())) + error ("Unable to narrow the event channel"); try { @@ -104,7 +108,26 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) CosNotifyChannelAdmin::AdminID aid; CosNotifyChannelAdmin::SupplierAdmin_var admin = - ec->new_for_suppliers (CosNotifyChannelAdmin::AND_OP, aid); + mec->named_new_for_suppliers (CosNotifyChannelAdmin::AND_OP, aid, + "TestSupplierAdmin"); + + try + { + admin = + mec->named_new_for_suppliers (CosNotifyChannelAdmin::AND_OP, + aid, "TestSupplierAdmin"); + error("Expected a SupplierAdmin " + "NotifyMonitoringExt::NameAlreadyUsed exception"); + } + catch (const NotifyMonitoringExt::NameAlreadyUsed&) + { + }; + + // We should be able to create another one with the same name + admin->destroy (); + admin = + mec->named_new_for_suppliers (CosNotifyChannelAdmin::AND_OP, aid, + "TestSupplierAdmin"); NotifyMonitoringExt::SupplierAdmin_var madmin = NotifyMonitoringExt::SupplierAdmin::_narrow (admin.in ()); @@ -158,6 +181,79 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) "NotifyMonitoringExt::NameAlreadyUsed exception"); } + CosNotifyChannelAdmin::ConsumerAdmin_var cadmin = + mec->named_new_for_consumers (CosNotifyChannelAdmin::AND_OP, aid, + "TestConsumerAdmin"); + + try + { + cadmin = + mec->named_new_for_consumers (CosNotifyChannelAdmin::AND_OP, + aid, "TestConsumerAdmin"); + error("Expected a ConsumerAdmin " + "NotifyMonitoringExt::NameAlreadyUsed exception"); + } + catch (const NotifyMonitoringExt::NameAlreadyUsed&) + { + }; + + // We should be able to create another one with the same name + cadmin->destroy (); + cadmin = + mec->named_new_for_consumers (CosNotifyChannelAdmin::AND_OP, aid, + "TestConsumerAdmin"); + + NotifyMonitoringExt::ConsumerAdmin_var mcadmin = + NotifyMonitoringExt::ConsumerAdmin::_narrow (cadmin.in ()); + if (CORBA::is_nil (mcadmin.in ())) + error("Could not narrow the consumer admin"); + + CosNotifyChannelAdmin::ProxySupplier_var supproxy = + mcadmin->obtain_named_notification_push_supplier ( + CosNotifyChannelAdmin::STRUCTURED_EVENT, pid, "consumer"); + + try + { + CosNotifyChannelAdmin::ProxySupplier_var fake = + mcadmin->obtain_named_notification_push_supplier + (CosNotifyChannelAdmin::STRUCTURED_EVENT, pid, "consumer"); + error("Expected a ProxySupplier " + "NotifyMonitoringExt::NameAlreadyUsed exception"); + } + catch (const NotifyMonitoringExt::NameAlreadyUsed&) + { + // This is expected + } + + stat_name = ecf_name + "/" + ec_name + "/" + + ACE_CString (NotifyMonitoringExt::EventChannelConsumerCount); + stat = instance->get (stat_name); + if (stat == 0) + error("Could not find the event channel consumers statistic"); + + stat->calculate (); + count = stat->last_sample (); + if (count != 1) + error("Invalid consumer count"); + + CosNotifyChannelAdmin::StructuredProxyPushSupplier_var push_supproxy + = CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow + (supproxy.in()); + ACE_ASSERT (!CORBA::is_nil (push_supproxy.in())); + push_supproxy->disconnect_structured_push_supplier(); + + try + { + CosNotifyChannelAdmin::ProxySupplier_var fake = + mcadmin->obtain_named_notification_push_supplier + (CosNotifyChannelAdmin::STRUCTURED_EVENT, pid, "consumer"); + } + catch (const NotifyMonitoringExt::NameAlreadyUsed&) + { + error("Unexpected ProxySupplier " + "NotifyMonitoringExt::NameAlreadyUsed exception"); + } + TAO_MonitorManager::shutdown (); orb->destroy (); } |