diff options
author | Gordon Sim <gsim@apache.org> | 2009-01-23 21:55:15 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-01-23 21:55:15 +0000 |
commit | b849efb083c88de6c1932d0f7a87a7c500dbd3d6 (patch) | |
tree | 727bdbdb8c3701d90a0d31e2f3e42b26d10c9f9d /cpp/src | |
parent | 85428cec6a090c58c34453dee8b38ac20a619a96 (diff) | |
download | qpid-python-b849efb083c88de6c1932d0f7a87a7c500dbd3d6.tar.gz |
Use special management ids for objects used in state transfer to new members. This prevents the ids getting out of sync across the cluster and allows management methods to be used reliably.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737203 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 56 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/management/IdAllocator.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 8 |
13 files changed, 167 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 66ee6281c6..b7446a2220 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -48,7 +48,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_, uint64_t objectId) : ConnectionState(out_, broker_), adapter(*this, isLink_), isLink(isLink_), @@ -70,9 +70,10 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std // TODO set last bool true if system connection - if (agent != 0) + if (agent != 0) { mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); - agent->addObject(mgmtObject); + agent->addObject(mgmtObject, objectId); + } ConnectionState::setUrl(mgmtId); } } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 559cd4cfe3..80d828584d 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -64,7 +64,7 @@ class Connection : public sys::ConnectionInputHandler, public RefCounted { public: - Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0); ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 34673bdab3..53c49bf0ce 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -22,6 +22,7 @@ #include "Exchange.h" #include "ExchangeRegistry.h" #include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" #include "DeliverableMessage.h" @@ -32,6 +33,7 @@ using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::sys::Mutex; using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -109,12 +111,14 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); mgmtExchange->set_arguments(args); if (!durable) { - if (name == "") + if (name == "") { agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID - else if (name == "qpid.management") + } else if (name == "qpid.management") { agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID - else - agent->addObject (mgmtExchange); + } else { + ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); + agent->addObject (mgmtExchange, mb ? mb->allocateId(this) : 0); + } } } } @@ -245,7 +249,8 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang (agent, this, (Manageable*) parent, queueId, key, args); if (!origin.empty()) mgmtBinding->set_origin(origin); - agent->addObject (mgmtBinding); + ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); + agent->addObject (mgmtBinding, mb ? mb->allocateId(this) : 0); } } } diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 5de3e98bc0..488549bbf6 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -39,6 +39,22 @@ namespace broker { class ExchangeRegistry; class Exchange : public PersistableExchange, public management::Manageable { +public: + struct Binding : public management::Manageable { + typedef boost::shared_ptr<Binding> shared_ptr; + typedef std::vector<Binding::shared_ptr> vector; + + Queue::shared_ptr queue; + const std::string key; + const framing::FieldTable args; + qmf::org::apache::qpid::broker::Binding* mgmtBinding; + + Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, + framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); + ~Binding(); + management::ManagementObject* GetManagementObject() const; + }; + private: const std::string name; const bool durable; @@ -64,20 +80,6 @@ protected: void routeIVE(); - struct Binding : public management::Manageable { - typedef boost::shared_ptr<Binding> shared_ptr; - typedef std::vector<Binding::shared_ptr> vector; - - Queue::shared_ptr queue; - const std::string key; - const framing::FieldTable args; - qmf::org::apache::qpid::broker::Binding* mgmtBinding; - - Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, - framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); - ~Binding(); - management::ManagementObject* GetManagementObject() const; - }; struct MatchQueue { const Queue::shared_ptr queue; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 962b463571..d459c64c54 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -30,6 +30,7 @@ #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" @@ -46,6 +47,7 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -103,8 +105,10 @@ Queue::Queue(const string& _name, bool _autodelete, // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. - if (store == 0) - agent->addObject (mgmtObject); + if (store == 0) { + ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); + agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); + } } } } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index e17a813db7..0a24a39d38 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementBroker.h" #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> @@ -41,6 +42,7 @@ using namespace framing; using sys::Mutex; using boost::intrusive_ptr; using qpid::management::ManagementAgent; +using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -65,7 +67,8 @@ SessionState::SessionState( mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); - agent->addObject (mgmtObject); + ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); + agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); } } attach(h); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 6e1d275162..79c34d6873 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -21,13 +21,21 @@ #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" +#include "qpid/cluster/DumpClient.h" #include "qpid/broker/Broker.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/shared_ptr.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementBroker.h" +#include "qpid/management/IdAllocator.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/SessionState.h" + #include <boost/utility/in_place_factory.hpp> #include <boost/scoped_ptr.hpp> @@ -36,6 +44,9 @@ namespace cluster { using namespace std; using broker::Broker; +using management::IdAllocator; +using management::ManagementAgent; +using management::ManagementBroker; struct ClusterValues { string name; @@ -76,6 +87,46 @@ struct ClusterOptions : public Options { } }; +struct DumpClientIdAllocator : management::IdAllocator +{ + qpid::sys::AtomicValue<uint64_t> sequence; + + DumpClientIdAllocator() : sequence(0x4000000000000000LL) {} + + uint64_t getIdFor(management::Manageable* m) + { + if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) { + return ++sequence; + } else { + return 0; + } + } + + bool isDumpQueue(management::Manageable* manageable) + { + qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable); + return queue && queue->getName() == DumpClient::DUMP; + } + + bool isDumpExchange(management::Manageable* manageable) + { + qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable); + return exchange && exchange->getName() == DumpClient::DUMP; + } + + bool isDumpSession(management::Manageable* manageable) + { + broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable); + return session && session->getId().getName() == DumpClient::DUMP; + } + + bool isDumpBinding(management::Manageable* manageable) + { + broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable); + return binding && binding->queue->getName() == DumpClient::DUMP; + } +}; + struct ClusterPlugin : public Plugin { ClusterValues values; @@ -102,6 +153,11 @@ struct ClusterPlugin : public Plugin { boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); broker->getExchanges().registerExchange(cluster->getFailoverExchange()); + ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance()); + if (mgmt) { + std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator()); + mgmt->setAllocator(allocator); + } } void earlyInitialize(Plugin::Target&) {} diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index ac4b9dcdf2..d05baffe3a 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -69,7 +69,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0), + connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), readCredit(0), expectProtocolHeader(isLink) { init(); } @@ -396,5 +396,7 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } +qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 5d46b7e81d..29dee5eda4 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -31,6 +31,7 @@ #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/framing/FrameDecoder.h" @@ -173,6 +174,8 @@ class Connection : boost::shared_ptr<broker::TxBuffer> txBuffer; int readCredit; bool expectProtocolHeader; + + static qpid::sys::AtomicValue<uint64_t> catchUpId; friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 3f3212470d..00328eb310 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -94,7 +94,7 @@ DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url done(ok), failed(fail) { connection.open(url); - session = connection.newSession("dump_shared"); + session = connection.newSession(DUMP); } DumpClient::~DumpClient() {} diff --git a/cpp/src/qpid/management/IdAllocator.h b/cpp/src/qpid/management/IdAllocator.h new file mode 100644 index 0000000000..6fbc99afff --- /dev/null +++ b/cpp/src/qpid/management/IdAllocator.h @@ -0,0 +1,42 @@ +#ifndef QPID_MANAGEMENT_IDALLOCATOR_H +#define QPID_MANAGEMENT_IDALLOCATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" + +namespace qpid { +namespace management { + +/** + * Interface through which plugins etc can control the mgmt object id + * allocation for special cases + */ +struct IdAllocator +{ + virtual uint64_t getIdFor(Manageable* object) = 0; + virtual ~IdAllocator() {} +}; + +}} // namespace qpid::management + +#endif /*!QPID_MANAGEMENT_IDALLOCATOR_H*/ diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 2175bc4676..0f96e97fb0 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -20,6 +20,7 @@ */ #include "ManagementBroker.h" +#include "IdAllocator.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> @@ -1135,3 +1136,16 @@ size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) inBuffer.restore(); // restore original position return end - start; } + +void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a) +{ + Mutex::ScopedLock lock (addLock); + allocator = a; +} + +uint64_t ManagementBroker::allocateId(Manageable* object) +{ + Mutex::ScopedLock lock (addLock); + if (allocator.get()) return allocator->getIdFor(object); + return 0; +} diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 59dfb98596..f65d6a345e 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -32,10 +32,13 @@ #include "Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" #include <qpid/framing/AMQFrame.h> +#include <memory> namespace qpid { namespace management { +struct IdAllocator; + class ManagementBroker : public ManagementAgent { private: @@ -43,7 +46,6 @@ private: int threadPoolSize; public: - ManagementBroker (); virtual ~ManagementBroker (); @@ -78,6 +80,8 @@ public: uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } int getSignalFd () { assert(0); return -1; } + void setAllocator(std::auto_ptr<IdAllocator> allocator); + uint64_t allocateId(Manageable* object); private: friend class ManagementAgent; @@ -179,6 +183,8 @@ private: uint32_t nextRequestSequence; bool clientWasAdded; + std::auto_ptr<IdAllocator> allocator; + # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; |