diff options
author | Alan Conway <aconway@apache.org> | 2010-02-05 18:17:57 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-02-05 18:17:57 +0000 |
commit | 0264d1db06cdf57f40aa0f53d83139dab77393bc (patch) | |
tree | 9dfd71fdcd329892a4e1d89bf83fde09fd0d6e79 | |
parent | 04c7c8308c1ff5248c6f5a6e2b8712e100ebad47 (diff) | |
download | qpid-python-0264d1db06cdf57f40aa0f53d83139dab77393bc.tar.gz |
Synchronize management agent lists during cluster update.
- replicate management agent lists during cluster update.
- suppress management agent output during update.
- on join all members force full output at next periodic processing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@907030 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 69 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 127 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 22 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 5 |
8 files changed, 184 insertions, 66 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index d10e1fd458..eb6428d394 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -590,6 +590,7 @@ void Cluster::initMapCompleted(Lock& l) { if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); + if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. state = JOINER; mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); QPID_LOG(notice, *this << " joining cluster " << name); @@ -672,7 +673,7 @@ void Cluster::debugSnapshot(const char* prefix, Connection* connection) { assertClusterSafe(); std::ostringstream msg; msg << prefix; - if (connection) msg << " " << *connection; + if (connection) msg << " " << connection->getId(); msg << " snapshot " << map.getFrameSeq() << ":"; AppendQueue append(msg); broker.getQueues().eachQueue(append); @@ -761,7 +762,10 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) << " to " << updatee); deliverEventQueue.start(); // Not involved in update. } - if (updatee != self && url) debugSnapshot("join"); + if (updatee != self && url) { + debugSnapshot("join"); + if (mAgent) mAgent->clusterUpdate(); + } } static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { @@ -830,9 +834,11 @@ void Cluster::checkUpdateIn(Lock& l) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; broker.setClusterUpdatee(false); + if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); debugSnapshot("initial"); + if (mAgent) mAgent->clusterUpdate(); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update @@ -992,10 +998,12 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { + QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name) timer->deliverWakeup(name); } void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { + QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) timer->deliverDrop(name); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 5faa184e30..3ce2b3f376 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -197,7 +197,6 @@ struct GiveReadCreditOnExit { void Connection::deliverDoOutput(uint32_t limit) { output.deliverDoOutput(limit); - cluster.debugSnapshot("deliver-do-output", this); } // Called in delivery thread, in cluster order. @@ -532,5 +531,14 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence) agent->setBootSequence(bootSequence); } +void Connection::managementAgents(const std::string& data) { + management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); + if (!agent) + throw Exception(QPID_MSG("Management agents update but no management agent.")); + framing::Buffer buf(const_cast<char*>(data.data()), data.size()); + agent->importAgents(buf); + QPID_LOG(debug, cluster << " updated management agents"); +} + }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 9a4e52a9d6..a2f96782f7 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -164,6 +164,7 @@ class Connection : void addQueueListener(const std::string& queue, uint32_t listener); void managementSchema(const std::string& data); + void managementAgents(const std::string& data); void managementSetupState(uint64_t objectNum, uint16_t bootSequence); uint32_t getSsf() const { return connectionCtor.ssf; } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 6c8bb7e890..36efdfba65 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -53,6 +53,7 @@ #include "qpid/framing/TypeCode.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" +#include "qmf/org/apache/qpid/broker/ManagementSetupState.h" #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> @@ -128,15 +129,7 @@ void UpdateClient::update() { << " at " << updateeUrl); Broker& b = updaterBroker; - // - // Bash the state of the slave into conformance with ours. The - // goal here is to get his state arranged so as to mimic our - // state, w/r/t object ID creation. Currently, that means that we - // propagate our boot seq and object UID counter to him so that - // subsequently created objects on his side will track what's on - // our side. - // - updateManagementSetupState(b); + updateManagementSetupState(); b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); @@ -154,16 +147,8 @@ void UpdateClient::update() { ClusterConnectionProxy(session).expiryId(expiry.getId()); - // FIXME aconway 2010-01-08: we should enforce that all cluster members - // have mgmt enabled or none of them do. - - management::ManagementAgent* agent = updaterBroker.getManagementAgent(); - if (agent) { - string schemaData; - agent->exportSchemas(schemaData); - ClusterConnectionProxy(session).managementSchema(schemaData); - } - + updateManagementAgent(); + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); @@ -184,21 +169,41 @@ template <class T> std::string encode(const T& t) { } } // namespace -// -// Propagate the management setup state block, currently consisting of -// object number counter and boot sequence counter, to the slave. -// -void UpdateClient::updateManagementSetupState(Broker & b) + +// Propagate the management state +void UpdateClient::updateManagementSetupState() { management::ManagementAgent* agent = updaterBroker.getManagementAgent(); - if (agent) { - qmf::org::apache::qpid::broker::ManagementSetupState mss(b.getManagementAgent(), 0); - mss.set_objectNum(b.getManagementAgent()->getNextObjectId()); - mss.set_bootSequence(b.getManagementAgent()->getBootSequence()); - QPID_LOG(debug, updaterId << " updating management-setup-state " << mss.get_objectNum() - << " " << mss.get_bootSequence() << "\n"); - ClusterConnectionProxy(session).managementSetupState(mss.get_objectNum(), mss.get_bootSequence()); - } + if (!agent) return; + + // + // Bash the state of the slave into conformance with ours. The + // goal here is to get his state arranged so as to mimic our + // state, w/r/t object ID creation. Currently, that means that we + // propagate our boot seq and object UID counter to him so that + // subsequently created objects on his side will track what's on + // our side. + // + qmf::org::apache::qpid::broker::ManagementSetupState mss(agent, 0); + mss.set_objectNum(agent->getNextObjectId()); + mss.set_bootSequence(agent->getBootSequence()); + QPID_LOG(debug, updaterId << " updating management-setup-state " + << mss.get_objectNum() + << " " << mss.get_bootSequence() << "\n"); + ClusterConnectionProxy(session).managementSetupState( + mss.get_objectNum(), mss.get_bootSequence()); +} + +void UpdateClient::updateManagementAgent() +{ + management::ManagementAgent* agent = updaterBroker.getManagementAgent(); + if (!agent) return; + // Send management schemas and agents. + string data; + agent->exportSchemas(data); + ClusterConnectionProxy(session).managementSchema(data); + agent->exportAgents(data); + ClusterConnectionProxy(session).managementAgents(data); } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 7407b7450b..be09af7e81 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -29,7 +29,6 @@ #include "qpid/client/AsyncSession.h" #include "qpid/broker/SemanticState.h" #include "qpid/sys/Runnable.h" -#include "qmf/org/apache/qpid/broker/ManagementSetupState.h" #include <boost/shared_ptr.hpp> @@ -98,7 +97,8 @@ class UpdateClient : public sys::Runnable { void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); void updateQueueListeners(const boost::shared_ptr<broker::Queue>&); void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c); - void updateManagementSetupState(broker::Broker & b); + void updateManagementSetupState(); + void updateManagementAgent(); Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering; MemberId updaterId; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 460a11d0f0..e21edb4051 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -32,6 +32,7 @@ #include <list> #include <iostream> #include <fstream> +#include <sstream> using boost::intrusive_ptr; using qpid::framing::Uuid; @@ -53,7 +54,8 @@ ManagementAgent::RemoteAgent::~RemoteAgent () ManagementAgent::ManagementAgent () : threadPoolSize(1), interval(10), broker(0), timer(0), - startTime(uint64_t(Duration(now()))) + startTime(uint64_t(Duration(now()))), + suppressed(false) { nextObjectId = 1; brokerBank = 1; @@ -87,7 +89,7 @@ ManagementAgent::~ManagementAgent () } void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, - qpid::broker::Broker* _broker, int _threads) + qpid::broker::Broker* _broker, int _threads) { dataDir = _dataDir; interval = _interval; @@ -151,16 +153,16 @@ void ManagementAgent::writeData () } void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, - qpid::broker::Exchange::shared_ptr _dexchange) + qpid::broker::Exchange::shared_ptr _dexchange) { mExchange = _mexchange; dExchange = _dexchange; } void ManagementAgent::registerClass (const string& packageName, - const string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = findOrAddPackageLH(packageName); @@ -168,9 +170,9 @@ void ManagementAgent::registerClass (const string& packageName, } void ManagementAgent::registerEvent (const string& packageName, - const string& eventName, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) + const string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = findOrAddPackageLH(packageName); @@ -240,7 +242,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) : TimerTask (qpid::sys::Duration((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC), "ManagementAgent::periodicProcessing"), - agent(_agent) {} + agent(_agent) {} ManagementAgent::Periodic::~Periodic () {} @@ -271,6 +273,14 @@ void ManagementAgent::clientAdded (const std::string& routingKey) } } +void ManagementAgent::clusterUpdate() { + // Called on all cluster memebesr when a new member joins a cluster. + // Set clientWasAdded so that on the next periodicProcessing we will do + // a full update on all cluster members. + clientWasAdded = true; + debugSnapshot("update"); +} + void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); @@ -293,12 +303,15 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) } void ManagementAgent::sendBuffer(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - string routingKey) + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey) { - if (exchange.get() == 0) + if (suppressed) { + QPID_LOG(trace, "Suppressed management message to " << routingKey); return; + } + if (exchange.get() == 0) return; intrusive_ptr<Message> msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); @@ -341,7 +354,7 @@ void ManagementAgent::periodicProcessing (void) #define BUFSIZE 65536 #define HEADROOM 4096 QPID_LOG(trace, "Management agent periodic processing") - Mutex::ScopedLock lock (userLock); + Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; @@ -452,6 +465,7 @@ void ManagementAgent::periodicProcessing (void) sendBuffer (msgBuffer, contentSize, mExchange, routingKey); QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); } + debugSnapshot("periodic"); } void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) @@ -481,7 +495,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) } void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, - uint32_t code, string text) + uint32_t code, string text) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -497,8 +511,8 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, } bool ManagementAgent::dispatchCommand (Deliverable& deliverable, - const string& routingKey, - const FieldTable* /*args*/) + const string& routingKey, + const FieldTable* /*args*/) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); @@ -533,7 +547,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, } void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, - uint32_t sequence, const ConnectionToken* connToken) + uint32_t sequence, const ConnectionToken* connToken) { string methodName; string packageName; @@ -562,7 +576,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence) - return; + return; } if (acl != 0) { @@ -578,7 +592,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - return; + return; } } @@ -917,7 +931,6 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey agent->mgmtObject->set_brokerBank (brokerBank); agent->mgmtObject->set_agentBank (assignedBank); addObject (agent->mgmtObject, 0, true); - remoteAgents[connectionRef] = agent; QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); @@ -1062,7 +1075,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - } + } return false; } @@ -1135,7 +1148,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string sendBuffer (outBuffer, outLen, mExchange, "schema.package"); QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package") - return result.first; + return result.first; } void ManagementAgent::addClassLH(uint8_t kind, @@ -1366,3 +1379,69 @@ void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) { } } +void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const { + outBuf.checkAvailable(encodedSize()); + outBuf.putLong(brokerBank); + outBuf.putLong(agentBank); + outBuf.putShortString(routingKey); + connectionRef.encode(outBuf); + mgmtObject->writeProperties(outBuf); +} + +void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) { + brokerBank = inBuf.getLong(); + agentBank = inBuf.getLong(); + inBuf.getShortString(routingKey); + connectionRef.decode(inBuf); + mgmtObject = new _qmf::Agent(&agent, this); + mgmtObject->readProperties(inBuf); + agent.addObject(mgmtObject, 0, true); +} + +uint32_t ManagementAgent::RemoteAgent::encodedSize() const { + return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long + + routingKey.size() + sizeof(uint8_t) // ShortString + + connectionRef.encodedSize() + + mgmtObject->writePropertiesSize(); +} + +void ManagementAgent::exportAgents(std::string& out) { + out.clear(); + for (RemoteAgentMap::const_iterator i = remoteAgents.begin(); + i != remoteAgents.end(); + ++i) + { + ObjectId id = i->first; + RemoteAgent* agent = i->second; + size_t encodedSize = id.encodedSize() + agent->encodedSize(); + size_t end = out.size(); + out.resize(end + encodedSize); + framing::Buffer outBuf(&out[end], encodedSize); + id.encode(outBuf); + agent->encode(outBuf); + } +} + +void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { + while (inBuf.available()) { + ObjectId id; + inBuf.checkAvailable(id.encodedSize()); + id.decode(inBuf); + std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this)); + agent->decode(inBuf); + addObject (agent->mgmtObject, 0, false); + remoteAgents[agent->connectionRef] = agent.release(); + } +} + +void ManagementAgent::debugSnapshot(const char* type) { + std::ostringstream msg; + msg << type << " snapshot, agents:"; + for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); + i != remoteAgents.end(); ++i) + msg << " " << i->second->routingKey; + msg << " packages: " << packages.size(); + msg << " objects: " << managementObjects.size(); + msg << " new objects: " << newManagementObjects.size(); + QPID_LOG(trace, msg.str()); +} diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index b9ac54c064..ea04a6cb72 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -71,6 +71,9 @@ public: /** Called after plugins are initialized. */ void pluginsInitialized(); + /** Called by cluster to suppress management output during update. */ + void suppress(bool s) { suppressed = s; } + void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); @@ -90,6 +93,8 @@ public: severity_t severity = SEV_DEFAULT); QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); + QPID_BROKER_EXTERN void clusterUpdate(); + bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -105,9 +110,15 @@ public: /** Serialize my schemas as a binary blob into schemaOut */ void exportSchemas(std::string& schemaOut); + /** Serialize my remote-agent map as a binary blob into agentsOut */ + void exportAgents(std::string& agentsOut); + /** Decode a serialized schemas and add to my schema cache */ void importSchemas(framing::Buffer& inBuf); + /** Decode a serialized agent map */ + void importAgents(framing::Buffer& inBuf); + // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers uint64_t getNextObjectId(void) { return nextObjectId; } void setNextObjectId(uint64_t o) { nextObjectId = o; } @@ -136,9 +147,13 @@ private: std::string routingKey; ObjectId connectionRef; qmf::org::apache::qpid::broker::Agent* mgmtObject; - RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} + RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {} ManagementObject* GetManagementObject (void) const { return mgmtObject; } + virtual ~RemoteAgent (); + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedSize() const; }; // TODO: Eventually replace string with entire reply-to structure. reply-to @@ -205,9 +220,6 @@ private: ManagementObjectMap managementObjects; ManagementObjectMap newManagementObjects; - static ManagementAgent* agent; - static bool enabled; - framing::Uuid uuid; sys::Mutex addLock; sys::Mutex userLock; @@ -224,6 +236,7 @@ private: uint32_t nextRequestSequence; bool clientWasAdded; const uint64_t startTime; + bool suppressed; std::auto_ptr<IdAllocator> allocator; @@ -282,6 +295,7 @@ private: size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); size_t validateEventSchema(framing::Buffer&); + void debugSnapshot(const char*); }; }} diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 33553fe7f8..a879d5137b 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -248,6 +248,9 @@ <field name="bootSequence" type="uint16"/> </control> - + <!-- Replicate management agent's remote-agent map --> + <control name="management-agents" code="0x37"> + <field name="data" type="vbin32"/> + </control> </class> </amqp> |