diff options
author | Ted Ross <tross@apache.org> | 2008-10-31 04:15:52 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-10-31 04:15:52 +0000 |
commit | 35a9035b749126f5de606c4a76299208fb6748ff (patch) | |
tree | 1dbc3220d6eca8bb262a5508d85df79ff65da4ee /cpp | |
parent | ea0d105aac7f505f67878c56e3230c3b35dfd8ae (diff) | |
download | qpid-python-35a9035b749126f5de606c4a76299208fb6748ff.tar.gz |
Federation bug-fixes:
1) Locking was added to protect the exchange's vector of bridges.
2) Bridges are now properly torn down when a link is lost.
3) Auto-tracing was improperly assigning tags to federation queues.
Also, the federation queue name now uses the broker-id for the destination
broker. This makes it easier to determine which queues go to which
brokers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@709342 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 4 |
4 files changed, 22 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index cc76cf7f21..8104907402 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -59,7 +59,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, link(_link), id(_id), args(_args), mgmtObject(0), listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0) { - queueName += name; + queueName += link->getBroker()->getFederationTag(); ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { mgmtObject = new _qmf::Bridge @@ -109,17 +109,17 @@ void Bridge::create(ConnectionState& c) if (args.i_tag.size()) { queueSettings.setString("qpid.trace.id", args.i_tag); } else { - const string& localTag = link->getBroker()->getFederationTag(); - if (localTag.size()) - queueSettings.setString("qpid.trace.id", localTag); + const string& peerTag = connState->getFederationPeerTag(); + if (peerTag.size()) + queueSettings.setString("qpid.trace.id", peerTag); } if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); } else { - const string& peerTag = connState->getFederationPeerTag(); - if (peerTag.size()) - queueSettings.setString("qpid.trace.exclude", peerTag); + const string& localTag = link->getBroker()->getFederationTag(); + if (localTag.size()) + queueSettings.setString("qpid.trace.exclude", localTag); } bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 442246d971..fe388c2fe9 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -30,6 +30,7 @@ using namespace qpid::broker; using namespace qpid::framing; using qpid::framing::Buffer; using qpid::framing::FieldTable; +using qpid::sys::Mutex; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -180,11 +181,15 @@ void Exchange::registerDynamicBridge(DynamicBridge* db) if (!supportsDynamicBinding()) throw Exception("Exchange type does not support dynamic binding"); - for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); - iter != bridgeVector.end(); iter++) - (*iter)->sendReorigin(); + { + Mutex::ScopedLock l(bridgeLock); + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + (*iter)->sendReorigin(); + + bridgeVector.push_back(db); + } - bridgeVector.push_back(db); FieldTable args; args.setString(qpidFedOp, fedOpReorigin); bind(Queue::shared_ptr(), string(), &args); @@ -192,6 +197,7 @@ void Exchange::registerDynamicBridge(DynamicBridge* db) void Exchange::removeDynamicBridge(DynamicBridge* db) { + Mutex::ScopedLock l(bridgeLock); for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); iter != bridgeVector.end(); iter++) if (*iter == db) { @@ -206,6 +212,7 @@ void Exchange::handleHelloRequest() void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin) { + Mutex::ScopedLock l(bridgeLock); string myOp(op.empty() ? fedOpBind : op); for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index d1b38ea8b6..50410b6e06 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -165,6 +165,7 @@ public: virtual bool supportsDynamicBinding() { return false; } protected: + qpid::sys::Mutex bridgeLock; std::vector<DynamicBridge*> bridgeVector; virtual void handleHelloRequest(); diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 5cd976ee08..4f9b8bc104 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -139,8 +139,10 @@ void Link::closed (int, std::string text) if (state == STATE_OPERATIONAL) QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); - for (Bridges::iterator i = active.begin(); i != active.end(); i++) + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->cancel(); created.push_back(*i); + } active.clear(); if (state != STATE_FAILED) |