diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 56 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/management/IdAllocator.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 8 |
13 files changed, 167 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 66ee6281c6..b7446a2220 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -48,7 +48,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_, uint64_t objectId) : ConnectionState(out_, broker_), adapter(*this, isLink_), isLink(isLink_), @@ -70,9 +70,10 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std // TODO set last bool true if system connection - if (agent != 0) + if (agent != 0) { mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); - agent->addObject(mgmtObject); + agent->addObject(mgmtObject, objectId); + } ConnectionState::setUrl(mgmtId); } } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 559cd4cfe3..80d828584d 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -64,7 +64,7 @@ class Connection : public sys::ConnectionInputHandler, public RefCounted { public: - Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0); ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 34673bdab3..53c49bf0ce 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -22,6 +22,7 @@ #include "Exchange.h" #include "ExchangeRegistry.h" #include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" #include "DeliverableMessage.h" @@ -32,6 +33,7 @@ using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::sys::Mutex; using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -109,12 +111,14 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); mgmtExchange->set_arguments(args); if (!durable) { - if (name == "") + if (name == "") { agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID - else if (name == "qpid.management") + } else if (name == "qpid.management") { agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID - else - agent->addObject (mgmtExchange); + } else { + ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); + agent->addObject (mgmtExchange, mb ? mb->allocateId(this) : 0); + } } } } @@ -245,7 +249,8 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang (agent, this, (Manageable*) parent, queueId, key, args); if (!origin.empty()) mgmtBinding->set_origin(origin); - agent->addObject (mgmtBinding); + ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); + agent->addObject (mgmtBinding, mb ? mb->allocateId(this) : 0); } } } diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 5de3e98bc0..488549bbf6 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -39,6 +39,22 @@ namespace broker { class ExchangeRegistry; class Exchange : public PersistableExchange, public management::Manageable { +public: + struct Binding : public management::Manageable { + typedef boost::shared_ptr<Binding> shared_ptr; + typedef std::vector<Binding::shared_ptr> vector; + + Queue::shared_ptr queue; + const std::string key; + const framing::FieldTable args; + qmf::org::apache::qpid::broker::Binding* mgmtBinding; + + Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, + framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); + ~Binding(); + management::ManagementObject* GetManagementObject() const; + }; + private: const std::string name; const bool durable; @@ -64,20 +80,6 @@ protected: void routeIVE(); - struct Binding : public management::Manageable { - typedef boost::shared_ptr<Binding> shared_ptr; - typedef std::vector<Binding::shared_ptr> vector; - - Queue::shared_ptr queue; - const std::string key; - const framing::FieldTable args; - qmf::org::apache::qpid::broker::Binding* mgmtBinding; - - Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, - framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); - ~Binding(); - management::ManagementObject* GetManagementObject() const; - }; struct MatchQueue { const Queue::shared_ptr queue; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 962b463571..d459c64c54 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -30,6 +30,7 @@ #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" @@ -46,6 +47,7 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -103,8 +105,10 @@ Queue::Queue(const string& _name, bool _autodelete, // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. - if (store == 0) - agent->addObject (mgmtObject); + if (store == 0) { + ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); + agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); + } } } } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index e17a813db7..0a24a39d38 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementBroker.h" #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> @@ -41,6 +42,7 @@ using namespace framing; using sys::Mutex; using boost::intrusive_ptr; using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -65,7 +67,8 @@ SessionState::SessionState( mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); - agent->addObject (mgmtObject); + ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); + agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); } } attach(h); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 6e1d275162..79c34d6873 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -21,13 +21,21 @@ #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" +#include "qpid/cluster/DumpClient.h" #include "qpid/broker/Broker.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/shared_ptr.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementBroker.h" +#include "qpid/management/IdAllocator.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/SessionState.h" + #include <boost/utility/in_place_factory.hpp> #include <boost/scoped_ptr.hpp> @@ -36,6 +44,9 @@ namespace cluster { using namespace std; using broker::Broker; +using management::IdAllocator; +using management::ManagementAgent; +using management::ManagementBroker; struct ClusterValues { string name; @@ -76,6 +87,46 @@ struct ClusterOptions : public Options { } }; +struct DumpClientIdAllocator : management::IdAllocator +{ + qpid::sys::AtomicValue<uint64_t> sequence; + + DumpClientIdAllocator() : sequence(0x4000000000000000LL) {} + + uint64_t getIdFor(management::Manageable* m) + { + if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) { + return ++sequence; + } else { + return 0; + } + } + + bool isDumpQueue(management::Manageable* manageable) + { + qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable); + return queue && queue->getName() == DumpClient::DUMP; + } + + bool isDumpExchange(management::Manageable* manageable) + { + qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable); + return exchange && exchange->getName() == DumpClient::DUMP; + } + + bool isDumpSession(management::Manageable* manageable) + { + broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable); + return session && session->getId().getName() == DumpClient::DUMP; + } + + bool isDumpBinding(management::Manageable* manageable) + { + broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable); + return binding && binding->queue->getName() == DumpClient::DUMP; + } +}; + struct ClusterPlugin : public Plugin { ClusterValues values; @@ -102,6 +153,11 @@ struct ClusterPlugin : public Plugin { boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); broker->getExchanges().registerExchange(cluster->getFailoverExchange()); + ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance()); + if (mgmt) { + std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator()); + mgmt->setAllocator(allocator); + } } void earlyInitialize(Plugin::Target&) {} diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index ac4b9dcdf2..d05baffe3a 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -69,7 +69,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0), + connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), readCredit(0), expectProtocolHeader(isLink) { init(); } @@ -396,5 +396,7 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } +qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 5d46b7e81d..29dee5eda4 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -31,6 +31,7 @@ #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/framing/FrameDecoder.h" @@ -173,6 +174,8 @@ class Connection : boost::shared_ptr<broker::TxBuffer> txBuffer; int readCredit; bool expectProtocolHeader; + + static qpid::sys::AtomicValue<uint64_t> catchUpId; friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 3f3212470d..00328eb310 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -94,7 +94,7 @@ DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url done(ok), failed(fail) { connection.open(url); - session = connection.newSession("dump_shared"); + session = connection.newSession(DUMP); } DumpClient::~DumpClient() {} diff --git a/cpp/src/qpid/management/IdAllocator.h b/cpp/src/qpid/management/IdAllocator.h new file mode 100644 index 0000000000..6fbc99afff --- /dev/null +++ b/cpp/src/qpid/management/IdAllocator.h @@ -0,0 +1,42 @@ +#ifndef QPID_MANAGEMENT_IDALLOCATOR_H +#define QPID_MANAGEMENT_IDALLOCATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" + +namespace qpid { +namespace management { + +/** + * Interface through which plugins etc can control the mgmt object id + * allocation for special cases + */ +struct IdAllocator +{ + virtual uint64_t getIdFor(Manageable* object) = 0; + virtual ~IdAllocator() {} +}; + +}} // namespace qpid::management + +#endif /*!QPID_MANAGEMENT_IDALLOCATOR_H*/ diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 2175bc4676..0f96e97fb0 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -20,6 +20,7 @@ */ #include "ManagementBroker.h" +#include "IdAllocator.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> @@ -1135,3 +1136,16 @@ size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) inBuffer.restore(); // restore original position return end - start; } + +void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a) +{ + Mutex::ScopedLock lock (addLock); + allocator = a; +} + +uint64_t ManagementBroker::allocateId(Manageable* object) +{ + Mutex::ScopedLock lock (addLock); + if (allocator.get()) return allocator->getIdFor(object); + return 0; +} diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 59dfb98596..f65d6a345e 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -32,10 +32,13 @@ #include "Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" #include <qpid/framing/AMQFrame.h> +#include <memory> namespace qpid { namespace management { +struct IdAllocator; + class ManagementBroker : public ManagementAgent { private: @@ -43,7 +46,6 @@ private: int threadPoolSize; public: - ManagementBroker (); virtual ~ManagementBroker (); @@ -78,6 +80,8 @@ public: uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } int getSignalFd () { assert(0); return -1; } + void setAllocator(std::auto_ptr<IdAllocator> allocator); + uint64_t allocateId(Manageable* object); private: friend class ManagementAgent; @@ -179,6 +183,8 @@ private: uint32_t nextRequestSequence; bool clientWasAdded; + std::auto_ptr<IdAllocator> allocator; + # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; |