diff options
author | Alan Conway <aconway@apache.org> | 2010-03-08 17:34:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-03-08 17:34:09 +0000 |
commit | c1266e53dd9c20447f658b050fb789d4f0f9ab12 (patch) | |
tree | 513716e9b9b97f57c5493a9ec2e5186ef78c811e /cpp | |
parent | b97d2e0e9246a19eb155a38b9b06a9550ceb06aa (diff) | |
download | qpid-python-c1266e53dd9c20447f658b050fb789d4f0f9ab12.tar.gz |
QPID-2436: Fix cluster update of remote agents.
The v2key of cluster agents was not being passed as part of a cluster update.
This meant they were not being associated with the correct shadow connections on
the updatee. This caused inconsistencies that shut down the new broker.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@920414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 32 | ||||
-rw-r--r-- | cpp/src/tests/cluster_tests.fail | 2 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 24 | ||||
-rw-r--r-- | cpp/src/tests/test_env.sh.in | 2 | ||||
-rw-r--r-- | cpp/src/tests/testagent/Makefile.am | 5 |
8 files changed, 56 insertions, 32 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index b6ee2db362..92e2b65fe2 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -198,8 +198,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } - void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& current) { cluster.configChange(member, current, l); } + void ready(const std::string& url) { + cluster.ready(member, url, l); + } + void configChange(const std::string& current) { + cluster.configChange(member, current, l); + } void updateOffer(uint64_t updatee) { cluster.updateOffer(member, updatee, l); } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 6385729a09..1166f685d2 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -542,7 +542,7 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) { void Connection::managementSchema(const std::string& data) { management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); if (!agent) - throw Exception(QPID_MSG("Management schema update but no management agent.")); + throw Exception(QPID_MSG("Management schema update but management not enabled.")); framing::Buffer buf(const_cast<char*>(data.data()), data.size()); agent->importSchemas(buf); QPID_LOG(debug, cluster << " updated management schemas"); @@ -557,7 +557,7 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence) << objectNum << " seq " << bootSequence); management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); if (!agent) - throw Exception(QPID_MSG("Management schema update but no management agent.")); + throw Exception(QPID_MSG("Management schema update but management not enabled.")); agent->setNextObjectId(objectNum); agent->setBootSequence(bootSequence); } @@ -565,7 +565,7 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence) void Connection::managementAgents(const std::string& data) { management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); if (!agent) - throw Exception(QPID_MSG("Management agents update but no management agent.")); + throw Exception(QPID_MSG("Management agent update but management not enabled.")); framing::Buffer buf(const_cast<char*>(data.data()), data.size()); agent->importAgents(buf); QPID_LOG(debug, cluster << " updated management agents"); diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h index 26a99fa0b0..eedc99b0b2 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -31,6 +31,11 @@ namespace cluster { /** * Track status of cluster members during initialization. + * + * When a new member joins the CPG cluster, all members send an initial-status + * control. This map tracks those controls and provides data to make descisions + * about joining the cluster. + * */ class InitialStatusMap { @@ -38,7 +43,7 @@ class InitialStatusMap typedef framing::ClusterInitialStatusBody Status; InitialStatusMap(const MemberId& self, size_t size); - /** Process a config change. @return true if we need to re-send our status */ + /** Process a config change. May make isResendNeeded() true. */ void configChange(const MemberSet& newConfig); /** @return true if we need to re-send status */ bool isResendNeeded(); @@ -52,7 +57,7 @@ class InitialStatusMap bool transitionToComplete(); /**@pre isComplete(). @return this node's elders */ MemberSet getElders() const; - /**@pre isComplete(). @return True if we need an update. */ + /**@pre isComplete(). @return True if we need to request an update. */ bool isUpdateNeeded(); /**@pre isComplete(). @return Cluster-wide cluster ID. */ framing::Uuid getClusterId(); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 918acfe2c4..4454d70427 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -285,7 +285,7 @@ void ManagementAgent::clientAdded (const std::string& routingKey) } void ManagementAgent::clusterUpdate() { - // Called on all cluster memebesr when a new member joins a cluster. + // Called on all cluster memebers when a new member joins a cluster. // Set clientWasAdded so that on the next periodicProcessing we will do // a full update on all cluster members. clientWasAdded = true; @@ -1450,7 +1450,11 @@ void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const { outBuf.putLong(brokerBank); outBuf.putLong(agentBank); outBuf.putShortString(routingKey); - connectionRef.encode(outBuf); + // TODO aconway 2010-03-04: we send the v2Key instead of the + // ObjectId because that has the same meaning on different + // brokers. ObjectId::encode doesn't currently encode the v2Key, + // this can be cleaned up when it does. + outBuf.putMediumString(connectionRef.getV2Key()); mgmtObject->writeProperties(outBuf); } @@ -1458,16 +1462,24 @@ void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) { brokerBank = inBuf.getLong(); agentBank = inBuf.getLong(); inBuf.getShortString(routingKey); - connectionRef.decode(inBuf); + + // TODO aconway 2010-03-04: see comment in encode() + string connectionKey; + inBuf.getMediumString(connectionKey); + connectionRef = ObjectId(); // Clear out any existing value. + connectionRef.setV2Key(connectionKey); + mgmtObject = new _qmf::Agent(&agent, this); mgmtObject->readProperties(inBuf); - agent.addObject(mgmtObject, 0); + // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key. + mgmtObject->set_connectionRef(connectionRef); } uint32_t ManagementAgent::RemoteAgent::encodedSize() const { + // TODO aconway 2010-03-04: see comment in encode() return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long + routingKey.size() + sizeof(uint8_t) // ShortString - + connectionRef.encodedSize() + + connectionRef.getV2Key().size() + sizeof(uint16_t) // medium string + mgmtObject->writePropertiesSize(); } @@ -1477,25 +1489,21 @@ void ManagementAgent::exportAgents(std::string& out) { i != remoteAgents.end(); ++i) { - ObjectId id = i->first; + // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode RemoteAgent* agent = i->second; - size_t encodedSize = id.encodedSize() + agent->encodedSize(); + size_t encodedSize = agent->encodedSize(); size_t end = out.size(); out.resize(end + encodedSize); framing::Buffer outBuf(&out[end], encodedSize); - id.encode(outBuf); agent->encode(outBuf); } } void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { while (inBuf.available()) { - ObjectId id; - inBuf.checkAvailable(id.encodedSize()); - id.decode(inBuf); std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this)); agent->decode(inBuf); - addObject (agent->mgmtObject, 0); + addObject(agent->mgmtObject, 0); remoteAgents[agent->connectionRef] = agent.release(); } } diff --git a/cpp/src/tests/cluster_tests.fail b/cpp/src/tests/cluster_tests.fail index 268795642d..b28b04f643 100644 --- a/cpp/src/tests/cluster_tests.fail +++ b/cpp/src/tests/cluster_tests.fail @@ -1,3 +1,3 @@ -cluster_tests.LongTests.test_management + diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 276ad1af2d..08f1697c7a 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -144,7 +144,7 @@ class LongTests(BrokerTest): i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) ErrorGenerator(b) - time.sleep(1) + time.sleep(min(5,self.duration()/2)) sender.stop() receiver.stop(sender.sent) for i in range(i, len(cluster)): cluster[i].kill() @@ -152,7 +152,7 @@ class LongTests(BrokerTest): def test_management(self): """Run management clients and other clients concurrently.""" - # FIXME aconway 2010-03-03: move to framework + # TODO aconway 2010-03-03: move to brokertest framework class ClientLoop(StoppableThread): """Run a client executable in a loop.""" def __init__(self, broker, cmd): @@ -173,14 +173,21 @@ class LongTests(BrokerTest): self.cmd, expect=EXPECT_UNKNOWN) finally: self.lock.release() try: exit = self.process.wait() - except: exit = 1 + except OSError, e: + # Seems to be a race in wait(), it throws + # "no such process" during test shutdown. + # Doesn't indicate a test error, ignore. + return + except Exception, e: + self.process.unexpected( + "client of %s: %s"%(self.broker.name, e)) self.lock.acquire() try: # Quit and ignore errors if stopped or expecting failure. if self.stopped: break if exit != 0: - self.process.unexpected("client of %s exit status %s" % - (self.broker.name, exit)) + self.process.unexpected( + "client of %s exit code %s"%(self.broker.name, exit)) finally: self.lock.release() except Exception, e: self.error = RethrownException("Error in ClientLoop.run") @@ -218,9 +225,7 @@ class LongTests(BrokerTest): ["perftest", "--count", 1000, "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], - [os.path.join(self.rootdir, "testagent/testagent"), "localhost", - str(broker.port())] - ]: + ["testagent", "localhost", str(broker.port())] ]: batch.append(ClientLoop(broker, cmd)) clients.append(batch) @@ -238,7 +243,7 @@ class LongTests(BrokerTest): start_mclients(b) while time.time() < endtime: - time.sleep(min(5,self.duration())) + time.sleep(min(5,self.duration()/2)) for b in cluster[alive:]: b.ready() # Check if a broker crashed. # Kill the first broker. Ignore errors on its clients and all the mclients for c in clients[alive] + mclients: c.expect_fail() @@ -252,7 +257,6 @@ class LongTests(BrokerTest): b = cluster.start() start_clients(b) for b in cluster[alive:]: start_mclients(b) - for c in chain(mclients, *clients): c.stop() diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in index 07bd4b2bee..db92c636d3 100644 --- a/cpp/src/tests/test_env.sh.in +++ b/cpp/src/tests/test_env.sh.in @@ -54,7 +54,7 @@ export RECEIVER_EXEC=$QPID_TEST_EXEC_DIR/receiver export SENDER_EXEC=$QPID_TEST_EXEC_DIR/sender # Path -export PATH=$top_builddir/src:$builddir:$srcdir:$PYTHON_COMMANDS:$QPID_TEST_EXEC_DIR:$PATH +export PATH=$top_builddir/src:$builddir:$builddir/testagent:$srcdir:$PYTHON_COMMANDS:$QPID_TEST_EXEC_DIR:$PATH # Modules export TEST_STORE_LIB=$testmoduledir/test_store.so diff --git a/cpp/src/tests/testagent/Makefile.am b/cpp/src/tests/testagent/Makefile.am index 5ca1a383e8..160d8adb57 100644 --- a/cpp/src/tests/testagent/Makefile.am +++ b/cpp/src/tests/testagent/Makefile.am @@ -43,7 +43,10 @@ CLEANFILES=$(GEN_SRC) gen.timestamp INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src -Igen -noinst_PROGRAMS=testagent +qpidexecdir = $(libexecdir)/qpid +qpidtestdir = $(qpidexecdir)/tests + +qpidtest_PROGRAMS=testagent testagent_SOURCES=testagent.cpp $(GEN_SRC) testagent_LDADD=$(top_builddir)/src/libqmf.la |