summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-02-05 18:17:57 +0000
committerAlan Conway <aconway@apache.org>2010-02-05 18:17:57 +0000
commit0264d1db06cdf57f40aa0f53d83139dab77393bc (patch)
tree9dfd71fdcd329892a4e1d89bf83fde09fd0d6e79
parent04c7c8308c1ff5248c6f5a6e2b8712e100ebad47 (diff)
downloadqpid-python-0264d1db06cdf57f40aa0f53d83139dab77393bc.tar.gz
Synchronize management agent lists during cluster update.
- replicate management agent lists during cluster update. - suppress management agent output during update. - on join all members force full output at next periodic processing. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@907030 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp69
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp127
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h22
-rw-r--r--qpid/cpp/xml/cluster.xml5
8 files changed, 184 insertions, 66 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index d10e1fd458..eb6428d394 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -590,6 +590,7 @@ void Cluster::initMapCompleted(Lock& l) {
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
+ if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
state = JOINER;
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
QPID_LOG(notice, *this << " joining cluster " << name);
@@ -672,7 +673,7 @@ void Cluster::debugSnapshot(const char* prefix, Connection* connection) {
assertClusterSafe();
std::ostringstream msg;
msg << prefix;
- if (connection) msg << " " << *connection;
+ if (connection) msg << " " << connection->getId();
msg << " snapshot " << map.getFrameSeq() << ":";
AppendQueue append(msg);
broker.getQueues().eachQueue(append);
@@ -761,7 +762,10 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
<< " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
}
- if (updatee != self && url) debugSnapshot("join");
+ if (updatee != self && url) {
+ debugSnapshot("join");
+ if (mAgent) mAgent->clusterUpdate();
+ }
}
static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
@@ -830,9 +834,11 @@ void Cluster::checkUpdateIn(Lock& l) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
broker.setClusterUpdatee(false);
+ if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
debugSnapshot("initial");
+ if (mAgent) mAgent->clusterUpdate();
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update
@@ -992,10 +998,12 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
}
void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
+ QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name)
timer->deliverWakeup(name);
}
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
+ QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
timer->deliverDrop(name);
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 5faa184e30..3ce2b3f376 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -197,7 +197,6 @@ struct GiveReadCreditOnExit {
void Connection::deliverDoOutput(uint32_t limit) {
output.deliverDoOutput(limit);
- cluster.debugSnapshot("deliver-do-output", this);
}
// Called in delivery thread, in cluster order.
@@ -532,5 +531,14 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence)
agent->setBootSequence(bootSequence);
}
+void Connection::managementAgents(const std::string& data) {
+ management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
+ if (!agent)
+ throw Exception(QPID_MSG("Management agents update but no management agent."));
+ framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+ agent->importAgents(buf);
+ QPID_LOG(debug, cluster << " updated management agents");
+}
+
}} // Namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 9a4e52a9d6..a2f96782f7 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -164,6 +164,7 @@ class Connection :
void addQueueListener(const std::string& queue, uint32_t listener);
void managementSchema(const std::string& data);
+ void managementAgents(const std::string& data);
void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
uint32_t getSsf() const { return connectionCtor.ssf; }
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 6c8bb7e890..36efdfba65 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -53,6 +53,7 @@
#include "qpid/framing/TypeCode.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
+#include "qmf/org/apache/qpid/broker/ManagementSetupState.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
#include <algorithm>
@@ -128,15 +129,7 @@ void UpdateClient::update() {
<< " at " << updateeUrl);
Broker& b = updaterBroker;
- //
- // Bash the state of the slave into conformance with ours. The
- // goal here is to get his state arranged so as to mimic our
- // state, w/r/t object ID creation. Currently, that means that we
- // propagate our boot seq and object UID counter to him so that
- // subsequently created objects on his side will track what's on
- // our side.
- //
- updateManagementSetupState(b);
+ updateManagementSetupState();
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
@@ -154,16 +147,8 @@ void UpdateClient::update() {
ClusterConnectionProxy(session).expiryId(expiry.getId());
- // FIXME aconway 2010-01-08: we should enforce that all cluster members
- // have mgmt enabled or none of them do.
-
- management::ManagementAgent* agent = updaterBroker.getManagementAgent();
- if (agent) {
- string schemaData;
- agent->exportSchemas(schemaData);
- ClusterConnectionProxy(session).managementSchema(schemaData);
- }
-
+ updateManagementAgent();
+
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
@@ -184,21 +169,41 @@ template <class T> std::string encode(const T& t) {
}
} // namespace
-//
-// Propagate the management setup state block, currently consisting of
-// object number counter and boot sequence counter, to the slave.
-//
-void UpdateClient::updateManagementSetupState(Broker & b)
+
+// Propagate the management state
+void UpdateClient::updateManagementSetupState()
{
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
- if (agent) {
- qmf::org::apache::qpid::broker::ManagementSetupState mss(b.getManagementAgent(), 0);
- mss.set_objectNum(b.getManagementAgent()->getNextObjectId());
- mss.set_bootSequence(b.getManagementAgent()->getBootSequence());
- QPID_LOG(debug, updaterId << " updating management-setup-state " << mss.get_objectNum()
- << " " << mss.get_bootSequence() << "\n");
- ClusterConnectionProxy(session).managementSetupState(mss.get_objectNum(), mss.get_bootSequence());
- }
+ if (!agent) return;
+
+ //
+ // Bash the state of the slave into conformance with ours. The
+ // goal here is to get his state arranged so as to mimic our
+ // state, w/r/t object ID creation. Currently, that means that we
+ // propagate our boot seq and object UID counter to him so that
+ // subsequently created objects on his side will track what's on
+ // our side.
+ //
+ qmf::org::apache::qpid::broker::ManagementSetupState mss(agent, 0);
+ mss.set_objectNum(agent->getNextObjectId());
+ mss.set_bootSequence(agent->getBootSequence());
+ QPID_LOG(debug, updaterId << " updating management-setup-state "
+ << mss.get_objectNum()
+ << " " << mss.get_bootSequence() << "\n");
+ ClusterConnectionProxy(session).managementSetupState(
+ mss.get_objectNum(), mss.get_bootSequence());
+}
+
+void UpdateClient::updateManagementAgent()
+{
+ management::ManagementAgent* agent = updaterBroker.getManagementAgent();
+ if (!agent) return;
+ // Send management schemas and agents.
+ string data;
+ agent->exportSchemas(data);
+ ClusterConnectionProxy(session).managementSchema(data);
+ agent->exportAgents(data);
+ ClusterConnectionProxy(session).managementAgents(data);
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 7407b7450b..be09af7e81 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -29,7 +29,6 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
-#include "qmf/org/apache/qpid/broker/ManagementSetupState.h"
#include <boost/shared_ptr.hpp>
@@ -98,7 +97,8 @@ class UpdateClient : public sys::Runnable {
void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c);
- void updateManagementSetupState(broker::Broker & b);
+ void updateManagementSetupState();
+ void updateManagementAgent();
Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
MemberId updaterId;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 460a11d0f0..e21edb4051 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -32,6 +32,7 @@
#include <list>
#include <iostream>
#include <fstream>
+#include <sstream>
using boost::intrusive_ptr;
using qpid::framing::Uuid;
@@ -53,7 +54,8 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
ManagementAgent::ManagementAgent () :
threadPoolSize(1), interval(10), broker(0), timer(0),
- startTime(uint64_t(Duration(now())))
+ startTime(uint64_t(Duration(now()))),
+ suppressed(false)
{
nextObjectId = 1;
brokerBank = 1;
@@ -87,7 +89,7 @@ ManagementAgent::~ManagementAgent ()
}
void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
- qpid::broker::Broker* _broker, int _threads)
+ qpid::broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
interval = _interval;
@@ -151,16 +153,16 @@ void ManagementAgent::writeData ()
}
void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
- qpid::broker::Exchange::shared_ptr _dexchange)
+ qpid::broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
}
void ManagementAgent::registerClass (const string& packageName,
- const string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+ const string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -168,9 +170,9 @@ void ManagementAgent::registerClass (const string& packageName,
}
void ManagementAgent::registerEvent (const string& packageName,
- const string& eventName,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+ const string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -240,7 +242,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
: TimerTask (qpid::sys::Duration((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC),
"ManagementAgent::periodicProcessing"),
- agent(_agent) {}
+ agent(_agent) {}
ManagementAgent::Periodic::~Periodic () {}
@@ -271,6 +273,14 @@ void ManagementAgent::clientAdded (const std::string& routingKey)
}
}
+void ManagementAgent::clusterUpdate() {
+ // Called on all cluster memebesr when a new member joins a cluster.
+ // Set clientWasAdded so that on the next periodicProcessing we will do
+ // a full update on all cluster members.
+ clientWasAdded = true;
+ debugSnapshot("update");
+}
+
void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
@@ -293,12 +303,15 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
}
void ManagementAgent::sendBuffer(Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- string routingKey)
+ uint32_t length,
+ qpid::broker::Exchange::shared_ptr exchange,
+ string routingKey)
{
- if (exchange.get() == 0)
+ if (suppressed) {
+ QPID_LOG(trace, "Suppressed management message to " << routingKey);
return;
+ }
+ if (exchange.get() == 0) return;
intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
@@ -341,7 +354,7 @@ void ManagementAgent::periodicProcessing (void)
#define BUFSIZE 65536
#define HEADROOM 4096
QPID_LOG(trace, "Management agent periodic processing")
- Mutex::ScopedLock lock (userLock);
+ Mutex::ScopedLock lock (userLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
@@ -452,6 +465,7 @@ void ManagementAgent::periodicProcessing (void)
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
}
+ debugSnapshot("periodic");
}
void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
@@ -481,7 +495,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
}
void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
- uint32_t code, string text)
+ uint32_t code, string text)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -497,8 +511,8 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
}
bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
- const string& routingKey,
- const FieldTable* /*args*/)
+ const string& routingKey,
+ const FieldTable* /*args*/)
{
Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
@@ -533,7 +547,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
}
void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
- uint32_t sequence, const ConnectionToken* connToken)
+ uint32_t sequence, const ConnectionToken* connToken)
{
string methodName;
string packageName;
@@ -562,7 +576,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
- return;
+ return;
}
if (acl != 0) {
@@ -578,7 +592,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
- return;
+ return;
}
}
@@ -917,7 +931,6 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
addObject (agent->mgmtObject, 0, true);
-
remoteAgents[connectionRef] = agent;
QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
@@ -1062,7 +1075,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
- }
+ }
return false;
}
@@ -1135,7 +1148,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string
sendBuffer (outBuffer, outLen, mExchange, "schema.package");
QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package")
- return result.first;
+ return result.first;
}
void ManagementAgent::addClassLH(uint8_t kind,
@@ -1366,3 +1379,69 @@ void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) {
}
}
+void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const {
+ outBuf.checkAvailable(encodedSize());
+ outBuf.putLong(brokerBank);
+ outBuf.putLong(agentBank);
+ outBuf.putShortString(routingKey);
+ connectionRef.encode(outBuf);
+ mgmtObject->writeProperties(outBuf);
+}
+
+void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) {
+ brokerBank = inBuf.getLong();
+ agentBank = inBuf.getLong();
+ inBuf.getShortString(routingKey);
+ connectionRef.decode(inBuf);
+ mgmtObject = new _qmf::Agent(&agent, this);
+ mgmtObject->readProperties(inBuf);
+ agent.addObject(mgmtObject, 0, true);
+}
+
+uint32_t ManagementAgent::RemoteAgent::encodedSize() const {
+ return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long
+ + routingKey.size() + sizeof(uint8_t) // ShortString
+ + connectionRef.encodedSize()
+ + mgmtObject->writePropertiesSize();
+}
+
+void ManagementAgent::exportAgents(std::string& out) {
+ out.clear();
+ for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
+ i != remoteAgents.end();
+ ++i)
+ {
+ ObjectId id = i->first;
+ RemoteAgent* agent = i->second;
+ size_t encodedSize = id.encodedSize() + agent->encodedSize();
+ size_t end = out.size();
+ out.resize(end + encodedSize);
+ framing::Buffer outBuf(&out[end], encodedSize);
+ id.encode(outBuf);
+ agent->encode(outBuf);
+ }
+}
+
+void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
+ while (inBuf.available()) {
+ ObjectId id;
+ inBuf.checkAvailable(id.encodedSize());
+ id.decode(inBuf);
+ std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this));
+ agent->decode(inBuf);
+ addObject (agent->mgmtObject, 0, false);
+ remoteAgents[agent->connectionRef] = agent.release();
+ }
+}
+
+void ManagementAgent::debugSnapshot(const char* type) {
+ std::ostringstream msg;
+ msg << type << " snapshot, agents:";
+ for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
+ i != remoteAgents.end(); ++i)
+ msg << " " << i->second->routingKey;
+ msg << " packages: " << packages.size();
+ msg << " objects: " << managementObjects.size();
+ msg << " new objects: " << newManagementObjects.size();
+ QPID_LOG(trace, msg.str());
+}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index b9ac54c064..ea04a6cb72 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -71,6 +71,9 @@ public:
/** Called after plugins are initialized. */
void pluginsInitialized();
+ /** Called by cluster to suppress management output during update. */
+ void suppress(bool s) { suppressed = s; }
+
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
@@ -90,6 +93,8 @@ public:
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
+ QPID_BROKER_EXTERN void clusterUpdate();
+
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
@@ -105,9 +110,15 @@ public:
/** Serialize my schemas as a binary blob into schemaOut */
void exportSchemas(std::string& schemaOut);
+ /** Serialize my remote-agent map as a binary blob into agentsOut */
+ void exportAgents(std::string& agentsOut);
+
/** Decode a serialized schemas and add to my schema cache */
void importSchemas(framing::Buffer& inBuf);
+ /** Decode a serialized agent map */
+ void importAgents(framing::Buffer& inBuf);
+
// these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
uint64_t getNextObjectId(void) { return nextObjectId; }
void setNextObjectId(uint64_t o) { nextObjectId = o; }
@@ -136,9 +147,13 @@ private:
std::string routingKey;
ObjectId connectionRef;
qmf::org::apache::qpid::broker::Agent* mgmtObject;
- RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
+ RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {}
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
+
virtual ~RemoteAgent ();
+ void encode(framing::Buffer& buffer) const;
+ void decode(framing::Buffer& buffer);
+ uint32_t encodedSize() const;
};
// TODO: Eventually replace string with entire reply-to structure. reply-to
@@ -205,9 +220,6 @@ private:
ManagementObjectMap managementObjects;
ManagementObjectMap newManagementObjects;
- static ManagementAgent* agent;
- static bool enabled;
-
framing::Uuid uuid;
sys::Mutex addLock;
sys::Mutex userLock;
@@ -224,6 +236,7 @@ private:
uint32_t nextRequestSequence;
bool clientWasAdded;
const uint64_t startTime;
+ bool suppressed;
std::auto_ptr<IdAllocator> allocator;
@@ -282,6 +295,7 @@ private:
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
size_t validateEventSchema(framing::Buffer&);
+ void debugSnapshot(const char*);
};
}}
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 33553fe7f8..a879d5137b 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -248,6 +248,9 @@
<field name="bootSequence" type="uint16"/>
</control>
-
+ <!-- Replicate management agent's remote-agent map -->
+ <control name="management-agents" code="0x37">
+ <field name="data" type="vbin32"/>
+ </control>
</class>
</amqp>