diff options
author | Alan Conway <aconway@apache.org> | 2010-01-11 17:23:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-01-11 17:23:18 +0000 |
commit | 892e84f39a40a3868ca5630371784e883127f21a (patch) | |
tree | 913127cae64803e5b00589bf5257e3729c5c66e7 /cpp | |
parent | 7a3841889a648eac5f57305c80f1f25a01a115ee (diff) | |
download | qpid-python-892e84f39a40a3868ca5630371784e883127f21a.tar.gz |
Fix broker crash "confirmed N but only sent M" with managed agents running.
The broker's ManagementAgent caches schemas from managed agents.
This cache was not being replicated to new cluster members.
If an agent such as sesame was running and connected to a newly-joined
broker, that broker could send schema request messages which were not
sent by other brokers that had the schema in cache. This resulted in
the other brokers exiting with a "confirmed N but only sent M"
message.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@897955 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/qpid/framing/Buffer.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 88 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 28 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 5 |
8 files changed, 127 insertions, 23 deletions
diff --git a/cpp/include/qpid/framing/Buffer.h b/cpp/include/qpid/framing/Buffer.h index 8a6a5c0d5f..50cc6fefe4 100644 --- a/cpp/include/qpid/framing/Buffer.h +++ b/cpp/include/qpid/framing/Buffer.h @@ -43,9 +43,8 @@ class Buffer uint32_t position; uint32_t r_position; - void checkAvailable(uint32_t count) { if (position + count > size) throw OutOfBounds(); } - public: + void checkAvailable(uint32_t count) { if (position + count > size) throw OutOfBounds(); } /** Buffer input/output iterator. * Supports using an amqp_0_10::Codec with a framing::Buffer. diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 320111c2e1..cc245d2f3f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 884125; +const uint32_t Cluster::CLUSTER_VERSION = 896973; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 3a5d121dc1..ea01dd6949 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -40,6 +40,7 @@ #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" #include <boost/current_function.hpp> @@ -478,5 +479,14 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) { findQueue(q)->getListeners().addListener(consumerNumbering[listener]); } +void Connection::managementSchema(const std::string& data) { + management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); + if (!agent) + throw Exception(QPID_MSG("Management schema update but no management agent.")); + framing::Buffer buf(const_cast<char*>(data.data()), data.size()); + agent->importSchemas(buf); + QPID_LOG(debug, cluster << " updated management schemas"); +} + }} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 51e6107bfd..4795d914ed 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -167,6 +167,7 @@ class Connection : OutputInterceptor& getOutput() { return output; } void addQueueListener(const std::string& queue, uint32_t listener); + void managementSchema(const std::string& data); private: struct NullFrameHandler : public framing::FrameHandler { diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index b20cc907a2..d4bd4da7f8 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -138,10 +138,21 @@ void UpdateClient::update() { session.queueDelete(arg::queue=UPDATE); session.close(); - // Update queue listeners: must come after sessions so consumerNumbering is populated. + // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); ClusterConnectionProxy(session).expiryId(expiry.getId()); + + // FIXME aconway 2010-01-08: we should enforce that all cluster members + // have mgmt enabled or none of them do. + + management::ManagementAgent* agent = updaterBroker.getManagementAgent(); + if (agent) { + string schemaData; + agent->exportSchemas(schemaData); + ClusterConnectionProxy(session).managementSchema(schemaData); + } + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index b5ed4ed405..9620383bce 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -706,8 +706,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); - outBuffer.putShortString(key.name); - outBuffer.putBin128(key.hash); + key.encode(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer (outBuffer, outLen, dExchange, replyToKey); @@ -730,7 +729,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) if (writeSchemaCall != 0) writeSchemaCall(buf); else - buf.putRawData(buffer, bufferLen); + buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -739,8 +738,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, SchemaClassKey key; inBuffer.getShortString (packageName); - inBuffer.getShortString (key.name); - inBuffer.getBin128 (key.hash); + key.decode(inBuffer); QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey << " seq=" << sequence); @@ -780,8 +778,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK inBuffer.record(); inBuffer.getOctet(); inBuffer.getShortString(packageName); - inBuffer.getShortString(key.name); - inBuffer.getBin128(key.hash); + key.decode(inBuffer); inBuffer.restore(); QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); @@ -796,9 +793,8 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); } else { - cIter->second.buffer = (uint8_t*) malloc(length); - cIter->second.bufferLen = length; - inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); + cIter->second.data.resize(length); + inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length); // Publish a class-indication message Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); @@ -1171,8 +1167,7 @@ void ManagementAgent::encodeClassIndication(Buffer& buf, buf.putOctet((*cIter).second.kind); buf.putShortString((*pIter).first); - buf.putShortString(key.name); - buf.putBin128(key.hash); + key.encode(buf); } size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind) @@ -1293,3 +1288,72 @@ uint64_t ManagementAgent::allocateId(Manageable* object) void ManagementAgent::disallow(const std::string& className, const std::string& methodName, const std::string& message) { disallowed[std::make_pair(className, methodName)] = message; } + +void ManagementAgent::SchemaClassKey::encode(framing::Buffer& buffer) const { + buffer.checkAvailable(encodedSize()); + buffer.putShortString(name); + buffer.putBin128(hash); +} + +void ManagementAgent::SchemaClassKey::decode(framing::Buffer& buffer) { + buffer.checkAvailable(encodedSize()); + buffer.getShortString(name); + buffer.getBin128(hash); +} + +uint32_t ManagementAgent::SchemaClassKey::encodedSize() const { + return 1 + name.size() + 16 /* bin128 */; +} + +void ManagementAgent::SchemaClass::encode(framing::Buffer& outBuf) const { + outBuf.checkAvailable(encodedSize()); + outBuf.putOctet(kind); + outBuf.putLong(pendingSequence); + outBuf.putLongString(data); +} + +void ManagementAgent::SchemaClass::decode(framing::Buffer& inBuf) { + inBuf.checkAvailable(encodedSize()); + kind = inBuf.getOctet(); + pendingSequence = inBuf.getLong(); + inBuf.getLongString(data); +} + +uint32_t ManagementAgent::SchemaClass::encodedSize() const { + return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size(); +} + +void ManagementAgent::exportSchemas(std::string& out) { + out.clear(); + for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) { + string name = i->first; + const ClassMap& classes = i ->second; + for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) { + const SchemaClassKey& key = j->first; + const SchemaClass& klass = j->second; + if (klass.writeSchemaCall == 0) { // Ignore built-in schemas. + // Encode name, schema-key, schema-class + size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize(); + size_t end = out.size(); + out.resize(end + encodedSize); + framing::Buffer outBuf(&out[end], encodedSize); + outBuf.putShortString(name); + key.encode(outBuf); + klass.encode(outBuf); + } + } + } +} + +void ManagementAgent::importSchemas(framing::Buffer& inBuf) { + while (inBuf.available()) { + string package; + SchemaClassKey key; + SchemaClass klass; + inBuf.getShortString(package); + key.decode(inBuf); + klass.decode(inBuf); + packages[package][key] = klass; + } +} + diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index c64c073d8c..1c06e5896b 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -97,7 +97,13 @@ public: /** Disallow a method. Attempts to call it will receive an exception with message. */ void disallow(const std::string& className, const std::string& methodName, const std::string& message); - + + /** Serialize my schemas as a binary blob into schemaOut */ + void exportSchemas(std::string& schemaOut); + + /** Decode a serialized schemas and add to my schema cache */ + void importSchemas(framing::Buffer& inBuf); + private: struct Periodic : public qpid::sys::TimerTask { @@ -140,6 +146,10 @@ private: { std::string name; uint8_t hash[16]; + + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedSize() const; }; struct SchemaClassKeyComp @@ -156,20 +166,24 @@ private: } }; + struct SchemaClass { uint8_t kind; ManagementObject::writeSchemaCall_t writeSchemaCall; + std::string data; uint32_t pendingSequence; - size_t bufferLen; - uint8_t* buffer; - SchemaClass(uint8_t _kind, uint32_t seq) : - kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} + SchemaClass(uint8_t _kind=0, uint32_t seq=0) : + kind(_kind), writeSchemaCall(0), pendingSequence(seq) {} SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : - kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} - bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } + kind(_kind), writeSchemaCall(call), pendingSequence(0) {} + bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } void appendSchema (framing::Buffer& buf); + + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedSize() const; }; typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 2155b6374e..0359514294 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -226,5 +226,10 @@ <field name="queue" type="str8"/> <field name="consumer" type="uint32"/> </control> + + <!-- Replicate management agent schema --> + <control name="management-schema" code="0x35"> + <field name="data" type="vbin32"/> + </control> </class> </amqp> |