summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-31 04:15:52 +0000
committerTed Ross <tross@apache.org>2008-10-31 04:15:52 +0000
commit35a9035b749126f5de606c4a76299208fb6748ff (patch)
tree1dbc3220d6eca8bb262a5508d85df79ff65da4ee /cpp
parentea0d105aac7f505f67878c56e3230c3b35dfd8ae (diff)
downloadqpid-python-35a9035b749126f5de606c4a76299208fb6748ff.tar.gz
Federation bug-fixes:
1) Locking was added to protect the exchange's vector of bridges. 2) Bridges are now properly torn down when a link is lost. 3) Auto-tracing was improperly assigning tags to federation queues. Also, the federation queue name now uses the broker-id for the destination broker. This makes it easier to determine which queues go to which brokers. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@709342 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp14
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp15
-rw-r--r--cpp/src/qpid/broker/Exchange.h1
-rw-r--r--cpp/src/qpid/broker/Link.cpp4
4 files changed, 22 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index cc76cf7f21..8104907402 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -59,7 +59,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
link(_link), id(_id), args(_args), mgmtObject(0),
listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
{
- queueName += name;
+ queueName += link->getBroker()->getFederationTag();
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
@@ -109,17 +109,17 @@ void Bridge::create(ConnectionState& c)
if (args.i_tag.size()) {
queueSettings.setString("qpid.trace.id", args.i_tag);
} else {
- const string& localTag = link->getBroker()->getFederationTag();
- if (localTag.size())
- queueSettings.setString("qpid.trace.id", localTag);
+ const string& peerTag = connState->getFederationPeerTag();
+ if (peerTag.size())
+ queueSettings.setString("qpid.trace.id", peerTag);
}
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
} else {
- const string& peerTag = connState->getFederationPeerTag();
- if (peerTag.size())
- queueSettings.setString("qpid.trace.exclude", peerTag);
+ const string& localTag = link->getBroker()->getFederationTag();
+ if (localTag.size())
+ queueSettings.setString("qpid.trace.exclude", localTag);
}
bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 442246d971..fe388c2fe9 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -30,6 +30,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
+using qpid::sys::Mutex;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -180,11 +181,15 @@ void Exchange::registerDynamicBridge(DynamicBridge* db)
if (!supportsDynamicBinding())
throw Exception("Exchange type does not support dynamic binding");
- for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
- iter != bridgeVector.end(); iter++)
- (*iter)->sendReorigin();
+ {
+ Mutex::ScopedLock l(bridgeLock);
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ (*iter)->sendReorigin();
+
+ bridgeVector.push_back(db);
+ }
- bridgeVector.push_back(db);
FieldTable args;
args.setString(qpidFedOp, fedOpReorigin);
bind(Queue::shared_ptr(), string(), &args);
@@ -192,6 +197,7 @@ void Exchange::registerDynamicBridge(DynamicBridge* db)
void Exchange::removeDynamicBridge(DynamicBridge* db)
{
+ Mutex::ScopedLock l(bridgeLock);
for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
iter != bridgeVector.end(); iter++)
if (*iter == db) {
@@ -206,6 +212,7 @@ void Exchange::handleHelloRequest()
void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin)
{
+ Mutex::ScopedLock l(bridgeLock);
string myOp(op.empty() ? fedOpBind : op);
for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index d1b38ea8b6..50410b6e06 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -165,6 +165,7 @@ public:
virtual bool supportsDynamicBinding() { return false; }
protected:
+ qpid::sys::Mutex bridgeLock;
std::vector<DynamicBridge*> bridgeVector;
virtual void handleHelloRequest();
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 5cd976ee08..4f9b8bc104 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -139,8 +139,10 @@ void Link::closed (int, std::string text)
if (state == STATE_OPERATIONAL)
QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
- for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->cancel();
created.push_back(*i);
+ }
active.clear();
if (state != STATE_FAILED)