diff options
Diffstat (limited to 'qpid/cpp')
38 files changed, 353 insertions, 282 deletions
diff --git a/qpid/cpp/managementgen/generate.py b/qpid/cpp/managementgen/generate.py index 197deec4f1..6024173f67 100755 --- a/qpid/cpp/managementgen/generate.py +++ b/qpid/cpp/managementgen/generate.py @@ -158,7 +158,8 @@ class Generator: raise ValueError ("path is not directory: %s" % path) if not exists: pair = os.path.split (path) - self.createPath (pair[0]) + if pair[0] != '': + self.createPath (pair[0]) os.mkdir (path) def normalize (self, path): diff --git a/qpid/cpp/managementgen/templates/Class.cpp b/qpid/cpp/managementgen/templates/Class.cpp index 100e306fe0..289427d742 100644 --- a/qpid/cpp/managementgen/templates/Class.cpp +++ b/qpid/cpp/managementgen/templates/Class.cpp @@ -23,7 +23,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" #include "/*MGEN:Class.NameCap*/.h" /*MGEN:Class.MethodArgIncludes*/ diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h index 557c7a45d5..8a4dc1006a 100644 --- a/qpid/cpp/managementgen/templates/Class.h +++ b/qpid/cpp/managementgen/templates/Class.h @@ -79,7 +79,6 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject public: friend class Package/*MGEN:Class.NamePackageCap*/; - typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; /*MGEN:Class.NameCap*/ (ManagementAgent* agent, Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); diff --git a/qpid/cpp/managementgen/templates/Package.cpp b/qpid/cpp/managementgen/templates/Package.cpp index 0c5af8d71d..8bb2d42c47 100644 --- a/qpid/cpp/managementgen/templates/Package.cpp +++ b/qpid/cpp/managementgen/templates/Package.cpp @@ -25,7 +25,7 @@ using namespace qpid::management; -Package/*MGEN:Schema.PackageNameCap*/::Package/*MGEN:Schema.PackageNameCap*/ (ManagementAgent::shared_ptr agent) +Package/*MGEN:Schema.PackageNameCap*/::Package/*MGEN:Schema.PackageNameCap*/ (ManagementAgent* agent) { /*MGEN:Schema.ClassRegisters*/ } diff --git a/qpid/cpp/managementgen/templates/Package.h b/qpid/cpp/managementgen/templates/Package.h index 214f811a1f..3f3ac35ffc 100644 --- a/qpid/cpp/managementgen/templates/Package.h +++ b/qpid/cpp/managementgen/templates/Package.h @@ -23,7 +23,7 @@ /*MGEN:Root.Disclaimer*/ -#include "qpid/management/ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" namespace qpid { namespace management { @@ -31,7 +31,7 @@ namespace management { class Package/*MGEN:Schema.PackageNameCap*/ { public: - Package/*MGEN:Schema.PackageNameCap*/ (ManagementAgent::shared_ptr agent); + Package/*MGEN:Schema.PackageNameCap*/ (ManagementAgent* agent); ~Package/*MGEN:Schema.PackageNameCap*/ () {} }; diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in index a26f61b125..8521d57ecf 100644 --- a/qpid/cpp/qpidc.spec.in +++ b/qpid/cpp/qpidc.spec.in @@ -130,6 +130,7 @@ make check %_includedir/qpid/sys %_includedir/qpid/log %_includedir/qpid/management +%_includedir/qpid/agent %_libdir/libqpidcommon.so %_libdir/libqpidclient.so %_bindir/managementgen diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 6d281e14ae..5b4a16429a 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -367,6 +367,7 @@ nobase_include_HEADERS = \ qpid/InlineAllocator.h \ qpid/memory.h \ qpid/shared_ptr.h \ + qpid/agent/ManagementAgent.h \ qpid/broker/Broker.h \ qpid/broker/SessionAdapter.h \ qpid/broker/Exchange.h \ @@ -523,7 +524,6 @@ nobase_include_HEADERS = \ qpid/log/Statement.h \ qpid/management/Args.h \ qpid/management/Manageable.h \ - qpid/management/ManagementAgent.h \ qpid/management/ManagementBroker.h \ qpid/management/ManagementExchange.h \ qpid/management/ManagementObject.h \ diff --git a/qpid/cpp/src/qpid/agent/ManagementAgent.h b/qpid/cpp/src/qpid/agent/ManagementAgent.h new file mode 100644 index 0000000000..97fc827f2a --- /dev/null +++ b/qpid/cpp/src/qpid/agent/ManagementAgent.h @@ -0,0 +1,116 @@ +#ifndef _qpid_agent_ManagementAgent_ +#define _qpid_agent_ManagementAgent_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/ManagementObject.h" +#include "qpid/management/Manageable.h" + +namespace qpid { +namespace management { + +class ManagementAgent +{ + public: + + ManagementAgent () {} + virtual ~ManagementAgent () {} + + static ManagementAgent* getAgent(); + + virtual int getMaxThreads() = 0; + + // Connect to a management broker + // + // brokerHost - Hostname or IP address (dotted-quad) of broker. + // + // brokerPort - TCP port of broker. + // + // intervalSeconds - The interval (in seconds) that this agent shall use + // between broadcast updates to the broker. + // + // useExternalThread - If true, the thread of control used for callbacks + // must be supplied by the user of the object (via the + // pollCallbacks method). + // + // If false, callbacks shall be invoked on the management + // agent's thread. In this case, the callback implementations + // MUST be thread safe. + // + virtual void init (std::string brokerHost = "localhost", + uint16_t brokerPort = 5672, + uint16_t intervalSeconds = 10, + bool useExternalThread = false) = 0; + + // Register a schema with the management agent. This is normally called by the + // package initializer generated by the management code generator. + // + virtual void + RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) = 0; + + // Add a management object to the agent. Once added, this object shall be visible + // in the greater management context. + // + // Please note that ManagementObject instances are not explicitly deleted from + // the management agent. When the core object represented by a management object + // is deleted, the "resourceDestroy" method on the management object must be called. + // It will then be reclaimed in due course by the management agent. + // + // Once a ManagementObject instance is added to the agent, the agent then owns the + // instance. The caller MUST NOT free the resources of the instance at any time. + // When it is no longer needed, invoke its "resourceDestroy" method and discard the + // pointer. This allows the management agent to report the deletion of the object + // in an orderly way. + // + virtual uint64_t addObject (ManagementObject* objectPtr, + uint32_t persistId = 0, + uint32_t persistBank = 4) = 0; + + // If "useExternalThread" was set to true in init, this method must + // be called to provide a thread for any pending method calls that have arrived. + // The method calls for ManagementObject instances shall be invoked synchronously + // during the execution of this method. + // + // callLimit may optionally be used to limit the number of callbacks invoked. + // if 0, no limit is imposed. + // + // The return value is the number of callbacks that remain queued after this + // call is complete. It can be used to determine whether or not further calls + // to pollCallbacks are necessary to clear the backlog. If callLimit is zero, + // the return value will also be zero. + // + virtual uint32_t pollCallbacks (uint32_t callLimit = 0) = 0; + + // If "useExternalThread" was set to true in the constructor, this method provides + // a standard file descriptor that can be used in a select statement to signal that + // there are method callbacks ready (i.e. that "pollCallbacks" will result in at + // least one method call). When this fd is ready-for-read, pollCallbacks may be + // invoked. Calling pollCallbacks shall reset the ready-to-read state of the fd. + // + virtual int getSignalFd (void) = 0; + +}; + +}} + +#endif /*!_qpid_agent_ManagementAgent_*/ diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 9274de0555..9e49404bae 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -22,7 +22,7 @@ #include "ConnectionState.h" #include "LinkRegistry.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" @@ -37,15 +37,14 @@ namespace broker { Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : - link(_link), id(_id), args(_args), + link(_link), id(_id), args(_args), mgmtObject(0), listener(l), name(Uuid(true).str()), persistenceId(0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); - if (agent.get() != 0) { - mgmtObject = management::Bridge::shared_ptr - (new management::Bridge(agent.get(), this, link, id, args.i_durable, args.i_src, args.i_dest, - args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes)); + ManagementAgent* agent = ManagementAgent::getAgent(); + if (agent != 0) { + mgmtObject = new management::Bridge(agent, this, link, id, args.i_durable, args.i_src, args.i_dest, + args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, + args.i_tag, args.i_excludes); if (!args.i_durable) agent->addObject(mgmtObject); } @@ -109,7 +108,7 @@ void Bridge::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::getAgent (); agent->addObject (mgmtObject, id); } persistenceId = id; @@ -175,9 +174,9 @@ uint32_t Bridge::encodedSize() const + args.i_excludes.size() + 1; } -management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const +management::ManagementObject* Bridge::GetManagementObject (void) const { - return dynamic_pointer_cast<management::ManagementObject>(mgmtObject); + return (management::ManagementObject*) mgmtObject; } management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, management::Args& /*args*/) diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 594a0ef508..06fba25268 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -53,7 +53,7 @@ public: void destroy(); bool isDurable() { return args.i_durable; } - management::ManagementObject::shared_ptr GetManagementObject() const; + management::ManagementObject* GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args); // PersistableConfig: @@ -70,10 +70,10 @@ private: std::auto_ptr<framing::AMQP_ServerProxy> peer; Link* link; - framing::ChannelId id; - management::ArgsLinkBridge args; - management::Bridge::shared_ptr mgmtObject; - CancellationListener listener; + framing::ChannelId id; + management::ArgsLinkBridge args; + management::Bridge* mgmtObject; + CancellationListener listener; std::string name; mutable uint64_t persistenceId; }; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index c391d25788..ba510f7894 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -137,13 +137,13 @@ Broker::Broker(const Broker::Options& conf) : ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), conf.mgmtPubInterval, this, conf.workerThreads + 3); managementAgent = management::ManagementAgent::getAgent (); - ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval); + ((ManagementBroker*) managementAgent)->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); systemObject = System::shared_ptr (system); - mgmtObject = management::Broker::shared_ptr (new management::Broker (managementAgent.get(), this, system, conf.port)); + mgmtObject = new management::Broker (managementAgent, this, system, conf.port); mgmtObject->set_workerThreads (conf.workerThreads); mgmtObject->set_maxConns (conf.maxConnections); mgmtObject->set_connBacklog (conf.connectionBacklog); @@ -199,9 +199,9 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); - ((ManagementBroker*) managementAgent.get())->setExchange (mExchange, dExchange); + ((ManagementBroker*) managementAgent)->setExchange (mExchange, dExchange); dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent - ((ManagementBroker*) managementAgent.get()); + ((ManagementBroker*) managementAgent); } else QPID_LOG(info, "Management not enabled"); @@ -298,9 +298,9 @@ Broker::~Broker() { QPID_LOG(notice, "Shut down"); } -ManagementObject::shared_ptr Broker::GetManagementObject(void) const +ManagementObject* Broker::GetManagementObject(void) const { - return dynamic_pointer_cast<ManagementObject> (mgmtObject); + return (ManagementObject*) mgmtObject; } Manageable* Broker::GetVhostObject(void) const diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 00fb4b9995..9a9f502bf0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -123,10 +123,9 @@ class Broker : public sys::Runnable, public Plugin::Target, SessionManager& getSessionManager() { return sessionManager; } ConnectionManager& getConnectionManager() { return connectionManager; } - management::ManagementObject::shared_ptr GetManagementObject (void) const; - management::Manageable* GetVhostObject (void) const; - management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args); + management::ManagementObject* GetManagementObject (void) const; + management::Manageable* GetVhostObject (void) const; + management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); /** Add to the broker's protocolFactorys */ void registerProtocolFactory(boost::shared_ptr<sys::ProtocolFactory>); @@ -161,10 +160,10 @@ class Broker : public sys::Runnable, public Plugin::Target, DtxManager dtxManager; SessionManager sessionManager; ConnectionManager connectionManager; - management::ManagementAgent::shared_ptr managementAgent; - management::Broker::shared_ptr mgmtObject; - Vhost::shared_ptr vhostObject; - System::shared_ptr systemObject; + management::ManagementAgent* managementAgent; + management::Broker* mgmtObject; + Vhost::shared_ptr vhostObject; + System::shared_ptr systemObject; void declareStandardExchange(const std::string& name, const std::string& type); }; diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index f6d35ff6ca..bb99c61cdd 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -25,7 +25,7 @@ #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> @@ -53,6 +53,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std isLink(isLink_), mgmtClosing(false), mgmtId(mgmtId_), + mgmtObject(0), links(broker_.getLinks()), lastInHandler(*this), inChain(lastInHandler) @@ -64,11 +65,10 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); + ManagementAgent* agent = ManagementAgent::getAgent(); - if (agent.get() != 0) - mgmtObject = management::Connection::shared_ptr - (new management::Connection(agent.get(), this, parent, mgmtId, !isLink)); + if (agent != 0) + mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink); agent->addObject(mgmtObject); } } @@ -82,7 +82,7 @@ void Connection::requestIOProcessing(boost::function0<void> callback) Connection::~Connection() { - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->resourceDestroy(); if (isLink) links.notifyClosed(mgmtId); @@ -105,7 +105,7 @@ void Connection::receivedLast(framing::AMQFrame& frame){ void Connection::recordFromServer(framing::AMQFrame& frame) { - if (mgmtObject.get() != 0) + if (mgmtObject != 0) { mgmtObject->inc_framesToClient(); mgmtObject->inc_bytesToClient(frame.size()); @@ -114,7 +114,7 @@ void Connection::recordFromServer(framing::AMQFrame& frame) void Connection::recordFromClient(framing::AMQFrame& frame) { - if (mgmtObject.get() != 0) + if (mgmtObject != 0) { mgmtObject->inc_framesFromClient(); mgmtObject->inc_bytesFromClient(frame.size()); @@ -134,7 +134,7 @@ string Connection::getAuthCredentials() if (!isLink) return string(); - if (mgmtObject.get() != 0) + if (mgmtObject != 0) { if (links.getAuthMechanism(mgmtId) == "ANONYMOUS") mgmtObject->set_authIdentity("anonymous"); @@ -154,7 +154,7 @@ void Connection::notifyConnectionForced(const string& text) void Connection::setUserId(const string& userId) { ConnectionState::setUserId(userId); - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->set_authIdentity(userId); } @@ -222,9 +222,9 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject::shared_ptr Connection::GetManagementObject(void) const +ManagementObject* Connection::GetManagementObject(void) const { - return dynamic_pointer_cast<ManagementObject>(mgmtObject); + return (ManagementObject*) mgmtObject; } Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) @@ -237,7 +237,7 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) { case management::Connection::METHOD_CLOSE : mgmtClosing = true; - if (mgmtObject.get()) mgmtObject->set_closing(1); + if (mgmtObject != 0) mgmtObject->set_closing(1); out->activateOutput(); status = Manageable::STATUS_OK; break; diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 6b3530366d..717e1a6270 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -79,7 +79,7 @@ class Connection : public sys::ConnectionInputHandler, void closeChannel(framing::ChannelId channel); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); @@ -107,7 +107,7 @@ class Connection : public sys::ConnectionInputHandler, bool mgmtClosing; const std::string mgmtId; boost::function0<void> ioCallback; - management::Connection::shared_ptr mgmtObject; + management::Connection* mgmtObject; LinkRegistry& links; framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler; framing::FrameHandler::Chain inChain; diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 84a5362766..4aa68bee9c 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -29,7 +29,7 @@ using qpid::management::Manageable; DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -37,7 +37,7 @@ DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent) : Exchange(_name, _durable, _args, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -53,9 +53,9 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (i == queues.end()) { Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingKey].push_back(binding); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } return true; } else{ @@ -77,9 +77,9 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c if (queues.empty()) { bindings.erase(routingKey); } - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { @@ -95,25 +95,25 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie for(i = queues.begin(); i != queues.end(); i++, count++) { msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding.get() != 0) + if ((*i)->mgmtBinding != 0) (*i)->mgmtBinding->inc_msgMatched (); } if(!count){ QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_msgDrops (); mgmtExchange->inc_byteDrops (msg.contentSize ()); } } else { - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_msgRoutes (count); mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); } } - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives (); mgmtExchange->inc_byteReceives (msg.contentSize ()); } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index c72b148338..e7de5615ff 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -21,7 +21,7 @@ #include "Exchange.h" #include "ExchangeRegistry.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" using namespace qpid::broker; using qpid::framing::Buffer; @@ -32,15 +32,14 @@ using qpid::management::Manageable; using qpid::management::Args; Exchange::Exchange (const string& _name, Manageable* parent) : - name(_name), durable(false), persistenceId(0) + name(_name), durable(false), persistenceId(0), mgmtExchange(0) { if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - if (agent.get () != 0) + ManagementAgent* agent = ManagementAgent::getAgent (); + if (agent != 0) { - mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (agent.get(), this, parent, _name, durable)); + mgmtExchange = new management::Exchange (agent, this, parent, _name, durable); agent->addObject (mgmtExchange); } } @@ -48,15 +47,14 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) - : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0) + : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), mgmtExchange(0) { if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - if (agent.get () != 0) + ManagementAgent* agent = ManagementAgent::getAgent (); + if (agent != 0) { - mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (agent.get(), this, parent, _name, durable)); + mgmtExchange = new management::Exchange (agent, this, parent, _name, durable); if (!durable) { if (name == "") agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID @@ -71,7 +69,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel Exchange::~Exchange () { - if (mgmtExchange.get () != 0) + if (mgmtExchange != 0) mgmtExchange->resourceDestroy (); } @@ -79,7 +77,7 @@ void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::getAgent (); agent->addObject (mgmtExchange, id, 2); } persistenceId = id; @@ -115,26 +113,25 @@ uint32_t Exchange::encodedSize() const + args.size(); } -ManagementObject::shared_ptr Exchange::GetManagementObject (void) const +ManagementObject* Exchange::GetManagementObject (void) const { - return dynamic_pointer_cast<ManagementObject> (mgmtExchange); + return (ManagementObject*) mgmtExchange; } Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent, FieldTable _args) - : queue(_queue), key(_key), args(_args) + : queue(_queue), key(_key), args(_args), mgmtBinding(0) { if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - if (agent.get() != 0) + ManagementAgent* agent = ManagementAgent::getAgent (); + if (agent != 0) { - ManagementObject::shared_ptr mo = queue->GetManagementObject(); - if (mo.get() != 0) + ManagementObject* mo = queue->GetManagementObject(); + if (mo != 0) { uint64_t queueId = mo->getObjectId(); - mgmtBinding = management::Binding::shared_ptr - (new management::Binding (agent.get(), this, (Manageable*) parent, queueId, key, args)); + mgmtBinding = new management::Binding (agent, this, (Manageable*) parent, queueId, key, args); agent->addObject (mgmtBinding); } } @@ -143,13 +140,13 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang Exchange::Binding::~Binding () { - if (mgmtBinding.get () != 0) + if (mgmtBinding != 0) mgmtBinding->resourceDestroy (); } -ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const +ManagementObject* Exchange::Binding::GetManagementObject () const { - return dynamic_pointer_cast<ManagementObject> (mgmtBinding); + return (ManagementObject*) mgmtBinding; } Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&) diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index ccd74299f8..f4ac4373e4 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -54,16 +54,16 @@ namespace qpid { Queue::shared_ptr queue; const std::string key; const framing::FieldTable args; - management::Binding::shared_ptr mgmtBinding; + management::Binding* mgmtBinding; Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, framing::FieldTable args = framing::FieldTable ()); ~Binding (); - management::ManagementObject::shared_ptr GetManagementObject () const; + management::ManagementObject* GetManagementObject () const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); }; - management::Exchange::shared_ptr mgmtExchange; + management::Exchange* mgmtExchange; public: typedef boost::shared_ptr<Exchange> shared_ptr; @@ -98,7 +98,7 @@ namespace qpid { static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t, management::Args&) { return management::Manageable::STATUS_UNKNOWN_METHOD; } }; diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 3483562292..373e9ab1cc 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -28,7 +28,7 @@ using namespace qpid::sys; FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) : Exchange(_name, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -36,7 +36,7 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent) : Exchange(_name, _durable, _args, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -52,9 +52,9 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, if (i == bindings.end()) { Binding::shared_ptr binding (new Binding ("", queue, this)); bindings.push_back(binding); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } return true; } else { @@ -72,9 +72,9 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* if (i != bindings.end()) { bindings.erase(i); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { @@ -88,11 +88,11 @@ void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){ msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding.get() != 0) + if ((*i)->mgmtBinding != 0) (*i)->mgmtBinding->inc_msgMatched (); } - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives (); mgmtExchange->inc_byteReceives (msg.contentSize ()); diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 20d9617c8f..54519a7bf6 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -45,7 +45,7 @@ namespace { HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -53,7 +53,7 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent) : Exchange(_name, _durable, _args, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -89,9 +89,9 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co HeaderMap headerMap(*args, binding); bindings.push_back(headerMap); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } return true; } else { @@ -114,9 +114,9 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, if (i != bindings.end()) { bindings.erase(i); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { @@ -133,11 +133,11 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) { if (match(i->first, *args)) msg.deliverTo(i->second->queue); - if (i->second->mgmtBinding.get() != 0) + if (i->second->mgmtBinding != 0) i->second->mgmtBinding->inc_msgMatched (); } - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives (); mgmtExchange->inc_byteReceives (msg.contentSize ()); diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 87c0020dcb..5c470040e2 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -23,7 +23,7 @@ #include "LinkRegistry.h" #include "Broker.h" #include "Connection.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" #include "qpid/management/Link.h" #include "boost/bind.hpp" #include "qpid/log/Statement.h" @@ -50,7 +50,7 @@ Link::Link(LinkRegistry* _links, management::Manageable* parent) : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), - persistenceId(0), broker(_broker), state(0), + persistenceId(0), mgmtObject(0), broker(_broker), state(0), visitCount(0), currentInterval(1), closing(false), @@ -59,11 +59,10 @@ Link::Link(LinkRegistry* _links, { if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); - if (agent.get() != 0) + ManagementAgent* agent = ManagementAgent::getAgent(); + if (agent != 0) { - mgmtObject = management::Link::shared_ptr - (new management::Link(agent.get(), this, parent, _host, _port, _useSsl, _durable)); + mgmtObject = new management::Link(agent, this, parent, _host, _port, _useSsl, _durable); if (!durable) agent->addObject(mgmtObject); } @@ -76,7 +75,7 @@ Link::~Link () if (state == STATE_OPERATIONAL && connection != 0) connection->close(); - if (mgmtObject.get () != 0) + if (mgmtObject != 0) mgmtObject->resourceDestroy (); } @@ -86,7 +85,7 @@ void Link::setStateLH (int newState) return; state = newState; - if (mgmtObject.get() == 0) + if (mgmtObject == 0) return; switch (state) @@ -109,7 +108,7 @@ void Link::startConnectionLH () boost::bind (&Link::closed, this, _1, _2)); } catch(std::exception& e) { setStateLH(STATE_WAITING); - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->set_lastError (e.what()); } } @@ -142,7 +141,7 @@ void Link::closed (int, std::string text) if (state != STATE_FAILED) { setStateLH(STATE_WAITING); - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->set_lastError (text); } @@ -259,7 +258,7 @@ void Link::notifyConnectionForced(const string text) Mutex::ScopedLock mutex(lock); setStateLH(STATE_FAILED); - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->set_lastError(text); } @@ -267,7 +266,7 @@ void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::getAgent (); agent->addObject (mgmtObject, id); } persistenceId = id; @@ -321,9 +320,9 @@ uint32_t Link::encodedSize() const + password.size() + 1; } -ManagementObject::shared_ptr Link::GetManagementObject (void) const +ManagementObject* Link::GetManagementObject (void) const { - return boost::dynamic_pointer_cast<ManagementObject> (mgmtObject); + return (ManagementObject*) mgmtObject; } Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args) diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index de757d112e..d425c49800 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -53,7 +53,7 @@ namespace qpid { string username; string password; mutable uint64_t persistenceId; - management::Link::shared_ptr mgmtObject; + management::Link* mgmtObject; Broker* broker; int state; uint32_t visitCount; @@ -123,7 +123,7 @@ namespace qpid { static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t, management::Args&); }; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index ad06b6ecaa..6d60f98505 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -62,16 +62,16 @@ Queue::Queue(const string& _name, bool _autodelete, exclusive(0), noLocal(false), persistenceId(0), - policyExceeded(false) + policyExceeded(false), + mgmtObject(0) { if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::getAgent (); - if (agent.get () != 0) + if (agent != 0) { - mgmtObject = management::Queue::shared_ptr - (new management::Queue (agent.get(), this, parent, _name, _store != 0, _autodelete, _owner != 0)); + mgmtObject = new management::Queue (agent, this, parent, _name, _store != 0, _autodelete, _owner != 0); // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. @@ -83,7 +83,7 @@ Queue::Queue(const string& _name, bool _autodelete, Queue::~Queue() { - if (mgmtObject.get () != 0) + if (mgmtObject != 0) mgmtObject->resourceDestroy (); } @@ -128,14 +128,14 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ } else { // if no store then mark as enqueued if (!enqueue(0, msg)){ - if (mgmtObject.get() != 0) { + if (mgmtObject != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); } push(msg); msg->enqueueComplete(); }else { - if (mgmtObject.get() != 0) { + if (mgmtObject != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -151,7 +151,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObject.get() != 0) { + if (mgmtObject != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -167,7 +167,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - if (mgmtObject.get() != 0) { + if (mgmtObject != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); @@ -348,7 +348,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){ } consumerCount++; - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->inc_consumerCount (); } @@ -357,7 +357,7 @@ void Queue::cancel(Consumer& c){ Mutex::ScopedLock locker(consumerLock); consumerCount--; if(exclusive) exclusive = 0; - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->dec_consumerCount (); } @@ -394,7 +394,7 @@ void Queue::pop(){ QueuedMessage& msg = messages.front(); if (policy.get()) policy->dequeued(msg.payload->contentSize()); - if (mgmtObject.get() != 0){ + if (mgmtObject != 0){ mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); if (msg.payload->isPersistent ()){ @@ -522,7 +522,7 @@ void Queue::configure(const FieldTable& _settings) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->set_arguments (_settings); } @@ -574,14 +574,14 @@ uint64_t Queue::getPersistenceId() const void Queue::setPersistenceId(uint64_t _persistenceId) const { - if (mgmtObject.get() != 0 && persistenceId == 0) + if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::getAgent (); agent->addObject (mgmtObject, _persistenceId, 3); if (externalQueueStore) { - ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject(); - if (childObj.get() != 0) + ManagementObject* childObj = externalQueueStore->GetManagementObject(); + if (childObj != 0) childObj->setReference(mgmtObject->getObjectId()); } } @@ -669,8 +669,8 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { externalQueueStore = inst; if (inst) { - ManagementObject::shared_ptr childObj = inst->GetManagementObject(); - if (childObj.get() != 0 && mgmtObject.get() != 0) + ManagementObject* childObj = inst->GetManagementObject(); + if (childObj != 0 && mgmtObject != 0) childObj->setReference(mgmtObject->getObjectId()); } } @@ -696,9 +696,9 @@ void Queue::Guard::wait(sys::Mutex& m) while (count) condition.wait(m); } -ManagementObject::shared_ptr Queue::GetManagementObject (void) const +ManagementObject* Queue::GetManagementObject (void) const { - return dynamic_pointer_cast<ManagementObject> (mgmtObject); + return (ManagementObject*) mgmtObject; } Manageable::status_t Queue::ManagementMethod (uint32_t methodId, diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 2907ca81cd..5b2311ce2c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -96,7 +96,7 @@ namespace qpid { QueueBindings bindings; boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; - management::Queue::shared_ptr mgmtObject; + management::Queue* mgmtObject; void pop(); void push(boost::intrusive_ptr<Message>& msg); @@ -201,7 +201,7 @@ namespace qpid { virtual void setExternalQueueStore(ExternalQueueStore* inst); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); }; diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp index 3466882a48..69ef29c3eb 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.cpp +++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp @@ -74,7 +74,7 @@ void SessionManager::detach(std::auto_ptr<SessionState> session) { session->detach(); if (session->getTimeout() > 0) { session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); - if (session->mgmtObject.get() != 0) + if (session->mgmtObject != 0) session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); detached.push_back(session.release()); // In expiry order eraseExpired(); diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 95145e5d0e..8a17a787a2 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -54,6 +54,7 @@ SessionState::SessionState( adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), + mgmtObject(0), inLastHandler(*this), outLastHandler(*this), inChain(inLastHandler), @@ -61,10 +62,9 @@ SessionState::SessionState( { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - if (agent.get () != 0) { - mgmtObject = management::Session::shared_ptr - (new management::Session (agent.get(), this, parent, getId().getName())); + ManagementAgent* agent = ManagementAgent::getAgent (); + if (agent != 0) { + mgmtObject = new management::Session (agent, this, parent, getId().getName()); mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); agent->addObject (mgmtObject); @@ -79,7 +79,7 @@ SessionState::~SessionState() { // they don't belong in the manager. For now rely on uniqueness of UUIDs. // broker.getSessionManager().forget(getId()); - if (mgmtObject.get () != 0) + if (mgmtObject != 0) mgmtObject->resourceDestroy (); } @@ -104,7 +104,7 @@ void SessionState::detach() { QPID_LOG(debug, getId() << ": detached on broker."); getConnection().outputTasks.removeOutputTask(&semanticState); handler = 0; - if (mgmtObject.get() != 0) + if (mgmtObject != 0) mgmtObject->set_attached (0); } @@ -113,7 +113,7 @@ void SessionState::attach(SessionHandler& h) { Mutex::ScopedLock l(lock); QPID_LOG(debug, getId() << ": attached on broker."); handler = &h; - if (mgmtObject.get() != 0) + if (mgmtObject != 0) { mgmtObject->set_attached (1); mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); @@ -129,9 +129,9 @@ void SessionState::activateOutput() { // FIXME aconway 2008-05-22: should we hold the lock over activateOutput?? } -ManagementObject::shared_ptr SessionState::GetManagementObject (void) const +ManagementObject* SessionState::GetManagementObject (void) const { - return dynamic_pointer_cast<ManagementObject> (mgmtObject); + return (ManagementObject*) mgmtObject; } Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index f2774dadd3..5d18ed161e 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -95,7 +95,7 @@ class SessionState : public qpid::SessionState, DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); @@ -128,7 +128,7 @@ class SessionState : public qpid::SessionState, MessageBuilder msgBuilder; IncompleteMessageList incomplete; IncompleteMessageList::CompletionListener enqueuedOp; - management::Session::shared_ptr mgmtObject; + management::Session* mgmtObject; framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler; framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler; framing::FrameHandler::Chain inChain, outChain; diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index 107942fab5..5e51beac35 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -18,7 +18,7 @@ // #include "System.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" #include "qpid/framing/Uuid.h" #include <sys/utsname.h> #include <iostream> @@ -28,11 +28,11 @@ using qpid::management::ManagementAgent; using namespace qpid::broker; using namespace std; -System::System (string _dataDir) +System::System (string _dataDir) : mgmtObject(0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::getAgent (); - if (agent.get () != 0) + if (agent != 0) { framing::Uuid systemId; @@ -62,8 +62,7 @@ System::System (string _dataDir) } } - mgmtObject = management::System::shared_ptr - (new management::System (agent.get(), this, systemId)); + mgmtObject = new management::System (agent, this, systemId); struct utsname _uname; if (uname (&_uname) == 0) { diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h index 65086abec0..ef7c6ba73b 100644 --- a/qpid/cpp/src/qpid/broker/System.h +++ b/qpid/cpp/src/qpid/broker/System.h @@ -32,7 +32,7 @@ class System : public management::Manageable { private: - management::System::shared_ptr mgmtObject; + management::System* mgmtObject; public: @@ -40,7 +40,7 @@ class System : public management::Manageable System (std::string _dataDir); - management::ManagementObject::shared_ptr GetManagementObject (void) const + management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } }; diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index a16421b090..cfd9ef7a9b 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -117,7 +117,7 @@ bool TopicPattern::match(const Tokens& target) const TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -125,7 +125,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent) : Exchange(_name, _durable, _args, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -137,9 +137,9 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons } else { Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingPattern].push_back(binding); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } return true; } @@ -158,9 +158,9 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co if(q == qv.end()) return false; qv.erase(q); if(qv.empty()) bindings.erase(bi); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } @@ -187,13 +187,13 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel Binding::vector& qv(i->second); for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ msg.deliverTo((*j)->queue); - if ((*j)->mgmtBinding.get() != 0) + if ((*j)->mgmtBinding != 0) (*j)->mgmtBinding->inc_msgMatched (); } } } - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives (); mgmtExchange->inc_byteReceives (msg.contentSize ()); diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp index cfe497c788..6c31628c5f 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.cpp +++ b/qpid/cpp/src/qpid/broker/Vhost.cpp @@ -18,21 +18,20 @@ // #include "Vhost.h" -#include "qpid/management/ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" using namespace qpid::broker; using qpid::management::ManagementAgent; -Vhost::Vhost (management::Manageable* parentBroker) +Vhost::Vhost (management::Manageable* parentBroker) : mgmtObject(0) { if (parentBroker != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent* agent = ManagementAgent::getAgent (); - if (agent.get () != 0) + if (agent != 0) { - mgmtObject = management::Vhost::shared_ptr - (new management::Vhost (agent.get(), this, parentBroker, "/")); + mgmtObject = new management::Vhost (agent, this, parentBroker, "/"); agent->addObject (mgmtObject, 3, 1); } } diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h index b702dcebf0..e56cc61272 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.h +++ b/qpid/cpp/src/qpid/broker/Vhost.h @@ -31,7 +31,7 @@ class Vhost : public management::Manageable { private: - management::Vhost::shared_ptr mgmtObject; + management::Vhost* mgmtObject; public: @@ -39,7 +39,7 @@ class Vhost : public management::Manageable Vhost (management::Manageable* parentBroker); - management::ManagementObject::shared_ptr GetManagementObject (void) const + management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } management::Manageable::status_t ManagementMethod (uint32_t, management::Args&) diff --git a/qpid/cpp/src/qpid/broker/XmlExchange.cpp b/qpid/cpp/src/qpid/broker/XmlExchange.cpp index 8c4d4f79a4..cb0f9a9606 100644 --- a/qpid/cpp/src/qpid/broker/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/broker/XmlExchange.cpp @@ -46,7 +46,7 @@ namespace broker { XmlExchange::XmlExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -54,7 +54,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent) : Exchange(_name, _durable, _args, _parent) { - if (mgmtExchange.get() != 0) + if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } @@ -96,8 +96,9 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const bindingsMap[routingKey] = bindings; QPID_LOG(trace, "Bound successfully with query: " << queryText ); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->inc_bindingCount(); } return true; } else{ @@ -127,8 +128,9 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons if (bindings.empty()) { bindingsMap.erase(routingKey); } - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); + ((management::Queue*) queue->GetManagementObject())->dec_bindingCount(); } return true; } else { @@ -203,25 +205,25 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT count++; QPID_LOG(trace, "Delivered to queue" ); - if ((*i)->mgmtBinding.get() != 0) + if ((*i)->mgmtBinding != 0) (*i)->mgmtBinding->inc_msgMatched (); } if(!count){ QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_msgDrops (); mgmtExchange->inc_byteDrops (msg.contentSize ()); } } else { - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_msgRoutes (count); mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); } } - if (mgmtExchange.get() != 0) { + if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives (); mgmtExchange->inc_byteReceives (msg.contentSize ()); } diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h index 25c24588fc..e2b8980465 100644 --- a/qpid/cpp/src/qpid/management/Manageable.h +++ b/qpid/cpp/src/qpid/management/Manageable.h @@ -23,7 +23,6 @@ #include "ManagementObject.h" #include "Args.h" #include <string> -#include <boost/shared_ptr.hpp> namespace qpid { namespace management { @@ -50,9 +49,9 @@ class Manageable // management object. This object is always of a class derived from // the pure-virtual "ManagementObject". // - // This accessor function returns a shared_ptr to the management object. + // This accessor function returns a pointer to the management object. // - virtual ManagementObject::shared_ptr GetManagementObject (void) const = 0; + virtual ManagementObject* GetManagementObject (void) const = 0; // Every "Manageable" object must implement ManagementMethod. This // function is called when a remote management client invokes a method diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h deleted file mode 100644 index c8a1b37823..0000000000 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ /dev/null @@ -1,53 +0,0 @@ -#ifndef _ManagementAgent_ -#define _ManagementAgent_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" - -namespace qpid { -namespace management { - -class ManagementAgent -{ - public: - - virtual ~ManagementAgent () {} - - typedef boost::shared_ptr<ManagementAgent> shared_ptr; - - static shared_ptr getAgent (void); - - virtual int getMaxThreads() = 0; - - virtual void RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) = 0; - virtual void addObject (ManagementObject::shared_ptr object, - uint32_t persistId = 0, - uint32_t persistBank = 4) = 0; -}; - -}} - -#endif /*!_ManagementAgent_*/ diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index 271a2ec73c..106033f76f 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -38,12 +38,12 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace std; -ManagementAgent::shared_ptr ManagementBroker::agent; -bool ManagementBroker::enabled = 0; +ManagementAgent* ManagementBroker::agent; +bool ManagementBroker::enabled = 0; ManagementBroker::RemoteAgent::~RemoteAgent () { - if (mgmtObject.get () != 0) + if (mgmtObject != 0) mgmtObject->resourceDestroy (); } @@ -91,7 +91,19 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea } } -ManagementBroker::~ManagementBroker () {} +ManagementBroker::~ManagementBroker () +{ + Mutex::ScopedLock lock (userLock); + + moveNewObjectsLH(); + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + ManagementObject* object = iter->second; + delete object; + } + managementObjects.clear(); +} void ManagementBroker::writeData () { @@ -108,24 +120,25 @@ void ManagementBroker::writeData () void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize) { enabled = 1; - if (agent.get () == 0) - agent = shared_ptr (new ManagementBroker (dataDir, interval, broker, threadPoolSize)); + if (agent == 0) + agent = new ManagementBroker (dataDir, interval, broker, threadPoolSize); } -ManagementAgent::shared_ptr ManagementAgent::getAgent (void) +ManagementAgent* ManagementAgent::getAgent (void) { return ManagementBroker::agent; } void ManagementBroker::shutdown (void) { - if (agent.get () != 0) + if (agent != 0) { - ManagementBroker* broker = (ManagementBroker*) agent.get(); + ManagementBroker* broker = (ManagementBroker*) agent; broker->mExchange.reset (); broker->dExchange.reset (); - agent.reset (); + delete agent; + agent = 0; } } @@ -146,9 +159,9 @@ void ManagementBroker::RegisterClass (string packageName, AddClassLocal (pIter, className, md5Sum, schemaCall); } -void ManagementBroker::addObject (ManagementObject::shared_ptr object, - uint32_t persistId, - uint32_t persistBank) +uint64_t ManagementBroker::addObject (ManagementObject* object, + uint32_t persistId, + uint32_t persistBank) { Mutex::ScopedLock lock (addLock); uint64_t objectId; @@ -168,6 +181,7 @@ void ManagementBroker::addObject (ManagementObject::shared_ptr object, object->setObjectId (objectId); newManagementObjects[objectId] = object; + return objectId; } ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) @@ -281,7 +295,7 @@ void ManagementBroker::PeriodicProcessing (void) iter != managementObjects.end (); iter++) { - ManagementObject::shared_ptr object = iter->second; + ManagementObject* object = iter->second; object->setAllChanged (); } } @@ -293,7 +307,7 @@ void ManagementBroker::PeriodicProcessing (void) iter != managementObjects.end (); iter++) { - ManagementObject::shared_ptr object = iter->second; + ManagementObject* object = iter->second; if (object->getConfigChanged () || object->isDeleted ()) { @@ -633,8 +647,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; - agent->mgmtObject = management::Agent::shared_ptr - (new management::Agent (this, agent)); + agent->mgmtObject = new management::Agent (this, agent); agent->mgmtObject->set_sessionId (sessionId); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); @@ -674,7 +687,7 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui iter != managementObjects.end (); iter++) { - ManagementObject::shared_ptr object = iter->second; + ManagementObject* object = iter->second; if (object->getClassName () == className) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index 18d30096e5..5e9114c3f4 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -21,18 +21,16 @@ * under the License. * */ - #include "qpid/Options.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Timer.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" -#include "ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" #include "ManagementObject.h" #include "Manageable.h" #include "qpid/management/Agent.h" #include <qpid/framing/AMQFrame.h> -#include <boost/shared_ptr.hpp> namespace qpid { namespace management { @@ -48,9 +46,9 @@ class ManagementBroker : public ManagementAgent virtual ~ManagementBroker (); - static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); - static shared_ptr getAgent (void); - static void shutdown (void); + static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); + static ManagementAgent* getAgent (void); + static void shutdown (void); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, @@ -60,14 +58,19 @@ class ManagementBroker : public ManagementAgent std::string className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); - void addObject (ManagementObject::shared_ptr object, - uint32_t persistId = 0, - uint32_t persistBank = 4); + uint64_t addObject (ManagementObject* object, + uint32_t persistId = 0, + uint32_t persistBank = 4); void clientAdded (void); void dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); - + + // Stubs for remote management agent calls + void init (std::string, uint16_t, uint16_t, bool) { assert(0); } + uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } + int getSignalFd () { assert(0); return -1; } + private: friend class ManagementAgent; @@ -86,8 +89,8 @@ class ManagementBroker : public ManagementAgent struct RemoteAgent : public Manageable { uint32_t objIdBank; - Agent::shared_ptr mgmtObject; - ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } + Agent* mgmtObject; + ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); }; @@ -143,7 +146,7 @@ class ManagementBroker : public ManagementAgent ManagementObjectMap managementObjects; ManagementObjectMap newManagementObjects; - static shared_ptr agent; + static ManagementAgent* agent; static bool enabled; framing::Uuid uuid; diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 2528ed4284..74d9571d10 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -21,7 +21,7 @@ #include "Manageable.h" #include "ManagementObject.h" -#include "ManagementAgent.h" +#include "qpid/agent/ManagementAgent.h" #include "qpid/framing/FieldTable.h" using namespace qpid::framing; diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index 732dd14a24..66adabf035 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -25,7 +25,6 @@ #include "qpid/sys/Time.h" #include "qpid/sys/Mutex.h" #include <qpid/framing/Buffer.h> -#include <boost/shared_ptr.hpp> #include <map> namespace qpid { @@ -82,7 +81,6 @@ class ManagementObject void writeTimestamps (qpid::framing::Buffer& buf); public: - typedef boost::shared_ptr<ManagementObject> shared_ptr; typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); ManagementObject (ManagementAgent* _agent, Manageable* _core) : @@ -121,7 +119,7 @@ class ManagementObject inline sys::Mutex& getLock() { return accessLock; } }; -typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap; +typedef std::map<uint64_t,ManagementObject*> ManagementObjectMap; }} |