diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 32 |
4 files changed, 36 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index b6ee2db362..92e2b65fe2 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -198,8 +198,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } - void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& current) { cluster.configChange(member, current, l); } + void ready(const std::string& url) { + cluster.ready(member, url, l); + } + void configChange(const std::string& current) { + cluster.configChange(member, current, l); + } void updateOffer(uint64_t updatee) { cluster.updateOffer(member, updatee, l); } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 6385729a09..1166f685d2 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -542,7 +542,7 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) { void Connection::managementSchema(const std::string& data) { management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); if (!agent) - throw Exception(QPID_MSG("Management schema update but no management agent.")); + throw Exception(QPID_MSG("Management schema update but management not enabled.")); framing::Buffer buf(const_cast<char*>(data.data()), data.size()); agent->importSchemas(buf); QPID_LOG(debug, cluster << " updated management schemas"); @@ -557,7 +557,7 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence) << objectNum << " seq " << bootSequence); management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); if (!agent) - throw Exception(QPID_MSG("Management schema update but no management agent.")); + throw Exception(QPID_MSG("Management schema update but management not enabled.")); agent->setNextObjectId(objectNum); agent->setBootSequence(bootSequence); } @@ -565,7 +565,7 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence) void Connection::managementAgents(const std::string& data) { management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); if (!agent) - throw Exception(QPID_MSG("Management agents update but no management agent.")); + throw Exception(QPID_MSG("Management agent update but management not enabled.")); framing::Buffer buf(const_cast<char*>(data.data()), data.size()); agent->importAgents(buf); QPID_LOG(debug, cluster << " updated management agents"); diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h index 26a99fa0b0..eedc99b0b2 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -31,6 +31,11 @@ namespace cluster { /** * Track status of cluster members during initialization. + * + * When a new member joins the CPG cluster, all members send an initial-status + * control. This map tracks those controls and provides data to make descisions + * about joining the cluster. + * */ class InitialStatusMap { @@ -38,7 +43,7 @@ class InitialStatusMap typedef framing::ClusterInitialStatusBody Status; InitialStatusMap(const MemberId& self, size_t size); - /** Process a config change. @return true if we need to re-send our status */ + /** Process a config change. May make isResendNeeded() true. */ void configChange(const MemberSet& newConfig); /** @return true if we need to re-send status */ bool isResendNeeded(); @@ -52,7 +57,7 @@ class InitialStatusMap bool transitionToComplete(); /**@pre isComplete(). @return this node's elders */ MemberSet getElders() const; - /**@pre isComplete(). @return True if we need an update. */ + /**@pre isComplete(). @return True if we need to request an update. */ bool isUpdateNeeded(); /**@pre isComplete(). @return Cluster-wide cluster ID. */ framing::Uuid getClusterId(); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 918acfe2c4..4454d70427 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -285,7 +285,7 @@ void ManagementAgent::clientAdded (const std::string& routingKey) } void ManagementAgent::clusterUpdate() { - // Called on all cluster memebesr when a new member joins a cluster. + // Called on all cluster memebers when a new member joins a cluster. // Set clientWasAdded so that on the next periodicProcessing we will do // a full update on all cluster members. clientWasAdded = true; @@ -1450,7 +1450,11 @@ void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const { outBuf.putLong(brokerBank); outBuf.putLong(agentBank); outBuf.putShortString(routingKey); - connectionRef.encode(outBuf); + // TODO aconway 2010-03-04: we send the v2Key instead of the + // ObjectId because that has the same meaning on different + // brokers. ObjectId::encode doesn't currently encode the v2Key, + // this can be cleaned up when it does. + outBuf.putMediumString(connectionRef.getV2Key()); mgmtObject->writeProperties(outBuf); } @@ -1458,16 +1462,24 @@ void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) { brokerBank = inBuf.getLong(); agentBank = inBuf.getLong(); inBuf.getShortString(routingKey); - connectionRef.decode(inBuf); + + // TODO aconway 2010-03-04: see comment in encode() + string connectionKey; + inBuf.getMediumString(connectionKey); + connectionRef = ObjectId(); // Clear out any existing value. + connectionRef.setV2Key(connectionKey); + mgmtObject = new _qmf::Agent(&agent, this); mgmtObject->readProperties(inBuf); - agent.addObject(mgmtObject, 0); + // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key. + mgmtObject->set_connectionRef(connectionRef); } uint32_t ManagementAgent::RemoteAgent::encodedSize() const { + // TODO aconway 2010-03-04: see comment in encode() return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long + routingKey.size() + sizeof(uint8_t) // ShortString - + connectionRef.encodedSize() + + connectionRef.getV2Key().size() + sizeof(uint16_t) // medium string + mgmtObject->writePropertiesSize(); } @@ -1477,25 +1489,21 @@ void ManagementAgent::exportAgents(std::string& out) { i != remoteAgents.end(); ++i) { - ObjectId id = i->first; + // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode RemoteAgent* agent = i->second; - size_t encodedSize = id.encodedSize() + agent->encodedSize(); + size_t encodedSize = agent->encodedSize(); size_t end = out.size(); out.resize(end + encodedSize); framing::Buffer outBuf(&out[end], encodedSize); - id.encode(outBuf); agent->encode(outBuf); } } void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { while (inBuf.available()) { - ObjectId id; - inBuf.checkAvailable(id.encodedSize()); - id.decode(inBuf); std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this)); agent->decode(inBuf); - addObject (agent->mgmtObject, 0); + addObject(agent->mgmtObject, 0); remoteAgents[agent->connectionRef] = agent.release(); } } |