summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qpid/framing/Buffer.h3
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp10
-rw-r--r--cpp/src/qpid/cluster/Connection.h1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp13
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp88
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h28
-rw-r--r--cpp/xml/cluster.xml5
8 files changed, 127 insertions, 23 deletions
diff --git a/cpp/include/qpid/framing/Buffer.h b/cpp/include/qpid/framing/Buffer.h
index 8a6a5c0d5f..50cc6fefe4 100644
--- a/cpp/include/qpid/framing/Buffer.h
+++ b/cpp/include/qpid/framing/Buffer.h
@@ -43,9 +43,8 @@ class Buffer
uint32_t position;
uint32_t r_position;
- void checkAvailable(uint32_t count) { if (position + count > size) throw OutOfBounds(); }
-
public:
+ void checkAvailable(uint32_t count) { if (position + count > size) throw OutOfBounds(); }
/** Buffer input/output iterator.
* Supports using an amqp_0_10::Codec with a framing::Buffer.
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 320111c2e1..cc245d2f3f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 884125;
+const uint32_t Cluster::CLUSTER_VERSION = 896973;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 3a5d121dc1..ea01dd6949 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -40,6 +40,7 @@
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
#include <boost/current_function.hpp>
@@ -478,5 +479,14 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) {
findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
}
+void Connection::managementSchema(const std::string& data) {
+ management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
+ if (!agent)
+ throw Exception(QPID_MSG("Management schema update but no management agent."));
+ framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+ agent->importSchemas(buf);
+ QPID_LOG(debug, cluster << " updated management schemas");
+}
+
}} // Namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 51e6107bfd..4795d914ed 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -167,6 +167,7 @@ class Connection :
OutputInterceptor& getOutput() { return output; }
void addQueueListener(const std::string& queue, uint32_t listener);
+ void managementSchema(const std::string& data);
private:
struct NullFrameHandler : public framing::FrameHandler {
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index b20cc907a2..d4bd4da7f8 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -138,10 +138,21 @@ void UpdateClient::update() {
session.queueDelete(arg::queue=UPDATE);
session.close();
- // Update queue listeners: must come after sessions so consumerNumbering is populated.
+ // Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
ClusterConnectionProxy(session).expiryId(expiry.getId());
+
+ // FIXME aconway 2010-01-08: we should enforce that all cluster members
+ // have mgmt enabled or none of them do.
+
+ management::ManagementAgent* agent = updaterBroker.getManagementAgent();
+ if (agent) {
+ string schemaData;
+ agent->exportSchemas(schemaData);
+ ClusterConnectionProxy(session).managementSchema(schemaData);
+ }
+
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index b5ed4ed405..9620383bce 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -706,8 +706,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin
encodeHeader (outBuffer, 'S', sequence);
outBuffer.putShortString(packageName);
- outBuffer.putShortString(key.name);
- outBuffer.putBin128(key.hash);
+ key.encode(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
@@ -730,7 +729,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
if (writeSchemaCall != 0)
writeSchemaCall(buf);
else
- buf.putRawData(buffer, bufferLen);
+ buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
}
void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -739,8 +738,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey,
SchemaClassKey key;
inBuffer.getShortString (packageName);
- inBuffer.getShortString (key.name);
- inBuffer.getBin128 (key.hash);
+ key.decode(inBuffer);
QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey << " seq=" << sequence);
@@ -780,8 +778,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK
inBuffer.record();
inBuffer.getOctet();
inBuffer.getShortString(packageName);
- inBuffer.getShortString(key.name);
- inBuffer.getBin128(key.hash);
+ key.decode(inBuffer);
inBuffer.restore();
QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
@@ -796,9 +793,8 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK
QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
} else {
- cIter->second.buffer = (uint8_t*) malloc(length);
- cIter->second.bufferLen = length;
- inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen);
+ cIter->second.data.resize(length);
+ inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length);
// Publish a class-indication message
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
@@ -1171,8 +1167,7 @@ void ManagementAgent::encodeClassIndication(Buffer& buf,
buf.putOctet((*cIter).second.kind);
buf.putShortString((*pIter).first);
- buf.putShortString(key.name);
- buf.putBin128(key.hash);
+ key.encode(buf);
}
size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind)
@@ -1293,3 +1288,72 @@ uint64_t ManagementAgent::allocateId(Manageable* object)
void ManagementAgent::disallow(const std::string& className, const std::string& methodName, const std::string& message) {
disallowed[std::make_pair(className, methodName)] = message;
}
+
+void ManagementAgent::SchemaClassKey::encode(framing::Buffer& buffer) const {
+ buffer.checkAvailable(encodedSize());
+ buffer.putShortString(name);
+ buffer.putBin128(hash);
+}
+
+void ManagementAgent::SchemaClassKey::decode(framing::Buffer& buffer) {
+ buffer.checkAvailable(encodedSize());
+ buffer.getShortString(name);
+ buffer.getBin128(hash);
+}
+
+uint32_t ManagementAgent::SchemaClassKey::encodedSize() const {
+ return 1 + name.size() + 16 /* bin128 */;
+}
+
+void ManagementAgent::SchemaClass::encode(framing::Buffer& outBuf) const {
+ outBuf.checkAvailable(encodedSize());
+ outBuf.putOctet(kind);
+ outBuf.putLong(pendingSequence);
+ outBuf.putLongString(data);
+}
+
+void ManagementAgent::SchemaClass::decode(framing::Buffer& inBuf) {
+ inBuf.checkAvailable(encodedSize());
+ kind = inBuf.getOctet();
+ pendingSequence = inBuf.getLong();
+ inBuf.getLongString(data);
+}
+
+uint32_t ManagementAgent::SchemaClass::encodedSize() const {
+ return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size();
+}
+
+void ManagementAgent::exportSchemas(std::string& out) {
+ out.clear();
+ for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
+ string name = i->first;
+ const ClassMap& classes = i ->second;
+ for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) {
+ const SchemaClassKey& key = j->first;
+ const SchemaClass& klass = j->second;
+ if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
+ // Encode name, schema-key, schema-class
+ size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize();
+ size_t end = out.size();
+ out.resize(end + encodedSize);
+ framing::Buffer outBuf(&out[end], encodedSize);
+ outBuf.putShortString(name);
+ key.encode(outBuf);
+ klass.encode(outBuf);
+ }
+ }
+ }
+}
+
+void ManagementAgent::importSchemas(framing::Buffer& inBuf) {
+ while (inBuf.available()) {
+ string package;
+ SchemaClassKey key;
+ SchemaClass klass;
+ inBuf.getShortString(package);
+ key.decode(inBuf);
+ klass.decode(inBuf);
+ packages[package][key] = klass;
+ }
+}
+
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index c64c073d8c..1c06e5896b 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -97,7 +97,13 @@ public:
/** Disallow a method. Attempts to call it will receive an exception with message. */
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
-
+
+ /** Serialize my schemas as a binary blob into schemaOut */
+ void exportSchemas(std::string& schemaOut);
+
+ /** Decode a serialized schemas and add to my schema cache */
+ void importSchemas(framing::Buffer& inBuf);
+
private:
struct Periodic : public qpid::sys::TimerTask
{
@@ -140,6 +146,10 @@ private:
{
std::string name;
uint8_t hash[16];
+
+ void encode(framing::Buffer& buffer) const;
+ void decode(framing::Buffer& buffer);
+ uint32_t encodedSize() const;
};
struct SchemaClassKeyComp
@@ -156,20 +166,24 @@ private:
}
};
+
struct SchemaClass
{
uint8_t kind;
ManagementObject::writeSchemaCall_t writeSchemaCall;
+ std::string data;
uint32_t pendingSequence;
- size_t bufferLen;
- uint8_t* buffer;
- SchemaClass(uint8_t _kind, uint32_t seq) :
- kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
+ SchemaClass(uint8_t _kind=0, uint32_t seq=0) :
+ kind(_kind), writeSchemaCall(0), pendingSequence(seq) {}
SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
- kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
- bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
+ kind(_kind), writeSchemaCall(call), pendingSequence(0) {}
+ bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
void appendSchema (framing::Buffer& buf);
+
+ void encode(framing::Buffer& buffer) const;
+ void decode(framing::Buffer& buffer);
+ uint32_t encodedSize() const;
};
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 2155b6374e..0359514294 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -226,5 +226,10 @@
<field name="queue" type="str8"/>
<field name="consumer" type="uint32"/>
</control>
+
+ <!-- Replicate management agent schema -->
+ <control name="management-schema" code="0x35">
+ <field name="data" type="vbin32"/>
+ </control>
</class>
</amqp>