summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-08-08 14:46:04 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-08-08 14:46:04 +0000
commit1f1e4b3168588648e8ee341e763250350d2be0cb (patch)
tree0751551b2bf662791eb498424290025a3f65be10
parentbb6d7afe7353fac9f0aa5503e00b4c46ebe5c64d (diff)
downloadqpid-python-1f1e4b3168588648e8ee341e763250350d2be0cb.tar.gz
QPID-4193: prevent race when connecting a multi-homed federated cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1370792 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp37
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp4
4 files changed, 19 insertions, 37 deletions
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index 0507fe6521..6f813554fa 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -119,6 +119,7 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
parent, failover));
if (durable && store) store->create(*link);
links[name] = link;
+ pendingLinks[name] = link;
QPID_LOG(debug, "Creating new link; name=" << name );
return std::pair<Link::shared_ptr, bool>(link, true);
}
@@ -229,6 +230,7 @@ void LinkRegistry::linkDestroyed(Link *link)
QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
Mutex::ScopedLock locker(lock);
+ pendingLinks.erase(link->getName());
LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
@@ -322,10 +324,12 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
Link::shared_ptr link;
{
Mutex::ScopedLock locker(lock);
- for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+ for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) {
if (l->second->pendingConnection(host, port)) {
link = l->second;
+ pendingLinks.erase(l);
connections[key] = link->getName();
+ QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName());
break;
}
}
@@ -347,6 +351,10 @@ void LinkRegistry::notifyClosed(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (link) {
+ {
+ Mutex::ScopedLock locker(lock);
+ pendingLinks[link->getName()] = link;
+ }
link->closed(0, "Closed by peer");
}
}
@@ -355,6 +363,10 @@ void LinkRegistry::notifyConnectionForced(const std::string& key, const std::str
{
Link::shared_ptr link = findLink(key);
if (link) {
+ {
+ Mutex::ScopedLock locker(lock);
+ pendingLinks[link->getName()] = link;
+ }
link->notifyConnectionForced(text);
}
}
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 5a39b62bd1..076ab831c9 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -47,6 +47,7 @@ namespace broker {
LinkMap links; /** indexed by name of Link */
BridgeMap bridges; /** indexed by name of Bridge */
ConnectionMap connections; /** indexed by connection identifier, gives link name */
+ LinkMap pendingLinks; /** pending connection, indexed by name of Link */
qpid::sys::Mutex lock;
Broker* broker;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index ff855eef18..6a219072e4 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -808,48 +808,19 @@ void Connection::config(const std::string& encoded) {
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
}
-namespace {
- // find a Link that matches the given Address
- class LinkFinder {
- qpid::Address id;
- boost::shared_ptr<broker::Link> link;
- public:
- LinkFinder(const qpid::Address& _id) : id(_id) {}
- boost::shared_ptr<broker::Link> getLink() { return link; }
- void operator() (boost::shared_ptr<broker::Link> l)
- {
- if (!link) {
- qpid::Address addr(l->getTransport(), l->getHost(), l->getPort());
- if (id == addr) {
- link = l;
- }
- }
- }
- };
-}
-
void Connection::internalState(const std::string& type,
const std::string& name,
const framing::FieldTable& state)
{
if (type == "link") {
- // name is the string representation of the Link's _configured_ destination address
- Url dest;
- try {
- dest = name;
- } catch(...) {
- throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name));
- }
- assert(dest.size());
- LinkFinder finder(dest[0]);
- cluster.getBroker().getLinks().eachLink(boost::ref(finder));
- if (finder.getLink()) {
+ boost::shared_ptr<qpid::broker::Link> link(cluster.getBroker().getLinks().getLink(name));
+ if (link.get()) {
try {
- finder.getLink()->setState(state);
+ link->setState(state);
} catch(...) {
throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state));
}
- QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state);
+ QPID_LOG(debug, cluster << " updated link " << name << " with state: " << state);
} else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name));
}
else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type));
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 8737418570..f9aa8b804d 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -690,10 +690,8 @@ void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
// now push the current state
framing::FieldTable state;
link->getState(state);
- std::ostringstream os;
- os << qpid::Address(link->getTransport(), link->getHost(), link->getPort());
ClusterConnectionProxy(session).internalState(std::string("link"),
- os.str(),
+ link->getName(),
state);
}