diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-08-08 14:46:04 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-08-08 14:46:04 +0000 |
commit | 1f1e4b3168588648e8ee341e763250350d2be0cb (patch) | |
tree | 0751551b2bf662791eb498424290025a3f65be10 | |
parent | bb6d7afe7353fac9f0aa5503e00b4c46ebe5c64d (diff) | |
download | qpid-python-1f1e4b3168588648e8ee341e763250350d2be0cb.tar.gz |
QPID-4193: prevent race when connecting a multi-homed federated cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1370792 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 37 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 4 |
4 files changed, 19 insertions, 37 deletions
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index 0507fe6521..6f813554fa 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -119,6 +119,7 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, parent, failover)); if (durable && store) store->create(*link); links[name] = link; + pendingLinks[name] = link; QPID_LOG(debug, "Creating new link; name=" << name ); return std::pair<Link::shared_ptr, bool>(link, true); } @@ -229,6 +230,7 @@ void LinkRegistry::linkDestroyed(Link *link) QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName()); Mutex::ScopedLock locker(lock); + pendingLinks.erase(link->getName()); LinkMap::iterator i = links.find(link->getName()); if (i != links.end()) { @@ -322,10 +324,12 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c) Link::shared_ptr link; { Mutex::ScopedLock locker(lock); - for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) { + for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) { if (l->second->pendingConnection(host, port)) { link = l->second; + pendingLinks.erase(l); connections[key] = link->getName(); + QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName()); break; } } @@ -347,6 +351,10 @@ void LinkRegistry::notifyClosed(const std::string& key) { Link::shared_ptr link = findLink(key); if (link) { + { + Mutex::ScopedLock locker(lock); + pendingLinks[link->getName()] = link; + } link->closed(0, "Closed by peer"); } } @@ -355,6 +363,10 @@ void LinkRegistry::notifyConnectionForced(const std::string& key, const std::str { Link::shared_ptr link = findLink(key); if (link) { + { + Mutex::ScopedLock locker(lock); + pendingLinks[link->getName()] = link; + } link->notifyConnectionForced(text); } } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 5a39b62bd1..076ab831c9 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -47,6 +47,7 @@ namespace broker { LinkMap links; /** indexed by name of Link */ BridgeMap bridges; /** indexed by name of Bridge */ ConnectionMap connections; /** indexed by connection identifier, gives link name */ + LinkMap pendingLinks; /** pending connection, indexed by name of Link */ qpid::sys::Mutex lock; Broker* broker; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index ff855eef18..6a219072e4 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -808,48 +808,19 @@ void Connection::config(const std::string& encoded) { else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); } -namespace { - // find a Link that matches the given Address - class LinkFinder { - qpid::Address id; - boost::shared_ptr<broker::Link> link; - public: - LinkFinder(const qpid::Address& _id) : id(_id) {} - boost::shared_ptr<broker::Link> getLink() { return link; } - void operator() (boost::shared_ptr<broker::Link> l) - { - if (!link) { - qpid::Address addr(l->getTransport(), l->getHost(), l->getPort()); - if (id == addr) { - link = l; - } - } - } - }; -} - void Connection::internalState(const std::string& type, const std::string& name, const framing::FieldTable& state) { if (type == "link") { - // name is the string representation of the Link's _configured_ destination address - Url dest; - try { - dest = name; - } catch(...) { - throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name)); - } - assert(dest.size()); - LinkFinder finder(dest[0]); - cluster.getBroker().getLinks().eachLink(boost::ref(finder)); - if (finder.getLink()) { + boost::shared_ptr<qpid::broker::Link> link(cluster.getBroker().getLinks().getLink(name)); + if (link.get()) { try { - finder.getLink()->setState(state); + link->setState(state); } catch(...) { throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state)); } - QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state); + QPID_LOG(debug, cluster << " updated link " << name << " with state: " << state); } else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name)); } else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type)); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 8737418570..f9aa8b804d 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -690,10 +690,8 @@ void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) { // now push the current state framing::FieldTable state; link->getState(state); - std::ostringstream os; - os << qpid::Address(link->getTransport(), link->getHost(), link->getPort()); ClusterConnectionProxy(session).internalState(std::string("link"), - os.str(), + link->getName(), state); } |