diff options
author | Alan Conway <aconway@apache.org> | 2011-04-29 15:21:20 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-04-29 15:21:20 +0000 |
commit | db64c6662a3a1a54abb31a6368138ca0a3752f8c (patch) | |
tree | 8eafd27dc4f8d41caffaa6515fd8e7ef0c2f432f /qpid | |
parent | b513226cc2f834b17a88525fafa2290542b925bb (diff) | |
download | qpid-python-db64c6662a3a1a54abb31a6368138ca0a3752f8c.tar.gz |
QPID-3235: clustered qpidd broker fails ocassionly the cluster_tests.ShortTests.test_route_update
Inconsistent stats changes on a Link were causing cluster
inconsistency. Fix is to disable those stats changes in a cluster.
Updated cluster_tests.py to reliably generate the error every time
without the fix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1097838 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 8 |
4 files changed, 42 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index f3acf7c660..91861ade3f 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,7 +30,6 @@ #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" -#include "qpid/sys/ClusterSafe.h" using namespace qpid::broker; using qpid::framing::Buffer; @@ -57,8 +56,8 @@ Link::Link(LinkRegistry* _links, string& _password, Broker* _broker, Manageable* parent) - : links(_links), store(_store), host(_host), port(_port), - transport(_transport), + : links(_links), store(_store), host(_host), port(_port), + transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), mgmtObject(0), broker(_broker), state(0), @@ -97,7 +96,8 @@ void Link::setStateLH (int newState) return; state = newState; - if (mgmtObject == 0) + + if (hideManagement()) return; switch (state) @@ -122,7 +122,7 @@ void Link::startConnectionLH () QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); } catch(std::exception& e) { setStateLH(STATE_WAITING); - if (mgmtObject != 0) + if (!hideManagement()) mgmtObject->set_lastError (e.what()); } } @@ -133,8 +133,7 @@ void Link::established () addr << host << ":" << port; QPID_LOG (info, "Inter-broker link established to " << addr.str()); - // Don't raise the management event in a cluster, other members wont't get this call. - if (broker && !broker->isInCluster()) + if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); { @@ -154,12 +153,11 @@ void Link::closed (int, std::string text) connection = 0; - // Don't raise the management event in a cluster, other members wont't get this call. if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); - if (broker && !broker->isInCluster()) + if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } @@ -172,7 +170,7 @@ void Link::closed (int, std::string text) if (state != STATE_FAILED) { setStateLH(STATE_WAITING); - if (mgmtObject != 0) + if (!hideManagement()) mgmtObject->set_lastError (text); } @@ -221,7 +219,7 @@ void Link::cancel(Bridge::shared_ptr bridge) { { Mutex::ScopedLock mutex(lock); - + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { if ((*i).get() == bridge.get()) { created.erase(i); @@ -277,9 +275,9 @@ void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); - if (connection && updateUrls) { + if (connection && updateUrls) { urls.reset(connection->getKnownHosts()); - QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); updateUrls = false; } @@ -309,7 +307,7 @@ void Link::reconnect(const qpid::Address& a) port = a.port; transport = a.protocol; startConnectionLH(); - if (mgmtObject != 0) { + if (!hideManagement()) { stringstream errorString; errorString << "Failed over to " << a; mgmtObject->set_lastError(errorString.str()); @@ -319,7 +317,7 @@ void Link::reconnect(const qpid::Address& a) bool Link::tryFailover() { Address next; - if (urls.next(next) && + if (urls.next(next) && (next.host != host || next.port != port || next.protocol != transport)) { links->changeAddress(Address(transport, host, port), next); QPID_LOG(debug, "Link failing over to " << host << ":" << port); @@ -329,6 +327,12 @@ bool Link::tryFailover() } } +// Management updates for a linke are inconsistent in a cluster, so they are +// suppressed. +bool Link::hideManagement() const { + return !mgmtObject || ( broker && broker->isInCluster()); +} + uint Link::nextChannel() { Mutex::ScopedLock mutex(lock); @@ -341,7 +345,7 @@ void Link::notifyConnectionForced(const string text) Mutex::ScopedLock mutex(lock); setStateLH(STATE_FAILED); - if (mgmtObject != 0) + if (!hideManagement()) mgmtObject->set_lastError(text); } @@ -363,7 +367,7 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) string authMechanism; string username; string password; - + buffer.getShortString(host); port = buffer.getShort(); buffer.getShortString(transport); @@ -375,7 +379,7 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) return links.declare(host, port, transport, durable, authMechanism, username, password).first; } -void Link::encode(Buffer& buffer) const +void Link::encode(Buffer& buffer) const { buffer.putShortString(string("link")); buffer.putShortString(host); @@ -387,8 +391,8 @@ void Link::encode(Buffer& buffer) const buffer.putShortString(password); } -uint32_t Link::encodedSize() const -{ +uint32_t Link::encodedSize() const +{ return host.size() + 1 // short-string (host) + 5 // short-string ("link") + 2 // port diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 75a680ff5d..4badd8b3a1 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -85,6 +85,7 @@ namespace qpid { void destroy(); // Called when mgmt deletes this link void ioThreadProcessing(); // Called on connection's IO thread by request bool tryFailover(); // Called during maintenance visit + bool hideManagement() const; public: typedef boost::shared_ptr<Link> shared_ptr; @@ -122,12 +123,12 @@ namespace qpid { void notifyConnectionForced(const std::string text); void setPassive(bool p); - + // PersistableConfig: void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } uint32_t encodedSize() const; - void encode(framing::Buffer& buffer) const; + void encode(framing::Buffer& buffer) const; const std::string& getName() const; static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); @@ -135,6 +136,7 @@ namespace qpid { // Manageable entry points management::ManagementObject* GetManagementObject(void) const; management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); + }; } } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index 7b1c75db74..e9885f5462 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -381,7 +381,7 @@ std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { return keystream.str(); } -void LinkRegistry::setPassive(bool p) +void LinkRegistry::setPassive(bool p) { Mutex::ScopedLock locker(lock); passiveChanged = p != passive; diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 26298393ff..727934ae26 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -282,6 +282,13 @@ acl allow all all qpid_tool.wait() scanner.join() assert scanner.found + # Regression test for https://issues.apache.org/jira/browse/QPID-3235 + # Inconsistent stats when changing elder. + + # Force a change of elder + cluster0.start() + cluster0[0].kill() + time.sleep(2) # Allow a management interval to pass. # Verify logs are consistent cluster_test_logs.verify_logs() @@ -602,7 +609,6 @@ acl allow all all send0.send("bar") # Should fail, exchange is deleted. self.fail("Expected not-found exception") except qpid.messaging.NotFound: pass - # FIXME aconway 2011-04-19: s0 is broken, new session self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) def test_deleted_exchange_inconsistent(self): |