diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/qpid/framing/Buffer.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 88 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 28 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 5 |
8 files changed, 127 insertions, 23 deletions
diff --git a/cpp/include/qpid/framing/Buffer.h b/cpp/include/qpid/framing/Buffer.h index 8a6a5c0d5f..50cc6fefe4 100644 --- a/cpp/include/qpid/framing/Buffer.h +++ b/cpp/include/qpid/framing/Buffer.h @@ -43,9 +43,8 @@ class Buffer uint32_t position; uint32_t r_position; - void checkAvailable(uint32_t count) { if (position + count > size) throw OutOfBounds(); } - public: + void checkAvailable(uint32_t count) { if (position + count > size) throw OutOfBounds(); } /** Buffer input/output iterator. * Supports using an amqp_0_10::Codec with a framing::Buffer. diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 320111c2e1..cc245d2f3f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 884125; +const uint32_t Cluster::CLUSTER_VERSION = 896973; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 3a5d121dc1..ea01dd6949 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -40,6 +40,7 @@ #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" #include <boost/current_function.hpp> @@ -478,5 +479,14 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) { findQueue(q)->getListeners().addListener(consumerNumbering[listener]); } +void Connection::managementSchema(const std::string& data) { + management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); + if (!agent) + throw Exception(QPID_MSG("Management schema update but no management agent.")); + framing::Buffer buf(const_cast<char*>(data.data()), data.size()); + agent->importSchemas(buf); + QPID_LOG(debug, cluster << " updated management schemas"); +} + }} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 51e6107bfd..4795d914ed 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -167,6 +167,7 @@ class Connection : OutputInterceptor& getOutput() { return output; } void addQueueListener(const std::string& queue, uint32_t listener); + void managementSchema(const std::string& data); private: struct NullFrameHandler : public framing::FrameHandler { diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index b20cc907a2..d4bd4da7f8 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -138,10 +138,21 @@ void UpdateClient::update() { session.queueDelete(arg::queue=UPDATE); session.close(); - // Update queue listeners: must come after sessions so consumerNumbering is populated. + // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); ClusterConnectionProxy(session).expiryId(expiry.getId()); + + // FIXME aconway 2010-01-08: we should enforce that all cluster members + // have mgmt enabled or none of them do. + + management::ManagementAgent* agent = updaterBroker.getManagementAgent(); + if (agent) { + string schemaData; + agent->exportSchemas(schemaData); + ClusterConnectionProxy(session).managementSchema(schemaData); + } + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index b5ed4ed405..9620383bce 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -706,8 +706,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); - outBuffer.putShortString(key.name); - outBuffer.putBin128(key.hash); + key.encode(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer (outBuffer, outLen, dExchange, replyToKey); @@ -730,7 +729,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) if (writeSchemaCall != 0) writeSchemaCall(buf); else - buf.putRawData(buffer, bufferLen); + buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -739,8 +738,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, SchemaClassKey key; inBuffer.getShortString (packageName); - inBuffer.getShortString (key.name); - inBuffer.getBin128 (key.hash); + key.decode(inBuffer); QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey << " seq=" << sequence); @@ -780,8 +778,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK inBuffer.record(); inBuffer.getOctet(); inBuffer.getShortString(packageName); - inBuffer.getShortString(key.name); - inBuffer.getBin128(key.hash); + key.decode(inBuffer); inBuffer.restore(); QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); @@ -796,9 +793,8 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); } else { - cIter->second.buffer = (uint8_t*) malloc(length); - cIter->second.bufferLen = length; - inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); + cIter->second.data.resize(length); + inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length); // Publish a class-indication message Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); @@ -1171,8 +1167,7 @@ void ManagementAgent::encodeClassIndication(Buffer& buf, buf.putOctet((*cIter).second.kind); buf.putShortString((*pIter).first); - buf.putShortString(key.name); - buf.putBin128(key.hash); + key.encode(buf); } size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind) @@ -1293,3 +1288,72 @@ uint64_t ManagementAgent::allocateId(Manageable* object) void ManagementAgent::disallow(const std::string& className, const std::string& methodName, const std::string& message) { disallowed[std::make_pair(className, methodName)] = message; } + +void ManagementAgent::SchemaClassKey::encode(framing::Buffer& buffer) const { + buffer.checkAvailable(encodedSize()); + buffer.putShortString(name); + buffer.putBin128(hash); +} + +void ManagementAgent::SchemaClassKey::decode(framing::Buffer& buffer) { + buffer.checkAvailable(encodedSize()); + buffer.getShortString(name); + buffer.getBin128(hash); +} + +uint32_t ManagementAgent::SchemaClassKey::encodedSize() const { + return 1 + name.size() + 16 /* bin128 */; +} + +void ManagementAgent::SchemaClass::encode(framing::Buffer& outBuf) const { + outBuf.checkAvailable(encodedSize()); + outBuf.putOctet(kind); + outBuf.putLong(pendingSequence); + outBuf.putLongString(data); +} + +void ManagementAgent::SchemaClass::decode(framing::Buffer& inBuf) { + inBuf.checkAvailable(encodedSize()); + kind = inBuf.getOctet(); + pendingSequence = inBuf.getLong(); + inBuf.getLongString(data); +} + +uint32_t ManagementAgent::SchemaClass::encodedSize() const { + return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size(); +} + +void ManagementAgent::exportSchemas(std::string& out) { + out.clear(); + for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) { + string name = i->first; + const ClassMap& classes = i ->second; + for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) { + const SchemaClassKey& key = j->first; + const SchemaClass& klass = j->second; + if (klass.writeSchemaCall == 0) { // Ignore built-in schemas. + // Encode name, schema-key, schema-class + size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize(); + size_t end = out.size(); + out.resize(end + encodedSize); + framing::Buffer outBuf(&out[end], encodedSize); + outBuf.putShortString(name); + key.encode(outBuf); + klass.encode(outBuf); + } + } + } +} + +void ManagementAgent::importSchemas(framing::Buffer& inBuf) { + while (inBuf.available()) { + string package; + SchemaClassKey key; + SchemaClass klass; + inBuf.getShortString(package); + key.decode(inBuf); + klass.decode(inBuf); + packages[package][key] = klass; + } +} + diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index c64c073d8c..1c06e5896b 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -97,7 +97,13 @@ public: /** Disallow a method. Attempts to call it will receive an exception with message. */ void disallow(const std::string& className, const std::string& methodName, const std::string& message); - + + /** Serialize my schemas as a binary blob into schemaOut */ + void exportSchemas(std::string& schemaOut); + + /** Decode a serialized schemas and add to my schema cache */ + void importSchemas(framing::Buffer& inBuf); + private: struct Periodic : public qpid::sys::TimerTask { @@ -140,6 +146,10 @@ private: { std::string name; uint8_t hash[16]; + + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedSize() const; }; struct SchemaClassKeyComp @@ -156,20 +166,24 @@ private: } }; + struct SchemaClass { uint8_t kind; ManagementObject::writeSchemaCall_t writeSchemaCall; + std::string data; uint32_t pendingSequence; - size_t bufferLen; - uint8_t* buffer; - SchemaClass(uint8_t _kind, uint32_t seq) : - kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} + SchemaClass(uint8_t _kind=0, uint32_t seq=0) : + kind(_kind), writeSchemaCall(0), pendingSequence(seq) {} SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : - kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} - bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } + kind(_kind), writeSchemaCall(call), pendingSequence(0) {} + bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } void appendSchema (framing::Buffer& buf); + + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedSize() const; }; typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 2155b6374e..0359514294 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -226,5 +226,10 @@ <field name="queue" type="str8"/> <field name="consumer" type="uint32"/> </control> + + <!-- Replicate management agent schema --> + <control name="management-schema" code="0x35"> + <field name="data" type="vbin32"/> + </control> </class> </amqp> |