summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp84
-rw-r--r--cpp/src/qpid/broker/Bridge.h11
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp102
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h46
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp56
-rw-r--r--cpp/src/qpid/broker/Exchange.h51
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp59
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h2
-rw-r--r--cpp/src/qpid/broker/Link.cpp20
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp78
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h10
11 files changed, 430 insertions, 89 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 5064320efb..cc76cf7f21 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -34,6 +34,18 @@ using qpid::framing::Buffer;
using qpid::management::ManagementAgent;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
namespace qpid {
namespace broker {
@@ -45,8 +57,9 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const _qmf::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
{
+ queueName += name;
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
@@ -65,7 +78,10 @@ Bridge::~Bridge()
void Bridge::create(ConnectionState& c)
{
+ connState = &c;
if (args.i_srcIsLocal) {
+ if (args.i_dynamic)
+ throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
Connection* conn = dynamic_cast<Connection*>(&c);
if (conn == 0)
@@ -74,7 +90,7 @@ void Bridge::create(ConnectionState& c)
channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
} else {
// Point the bridging commands at the remote peer broker
- channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput())));
}
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
@@ -88,8 +104,6 @@ void Bridge::create(ConnectionState& c)
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
- string queue = "bridge_queue_";
- queue += Uuid(true).str();
FieldTable queueSettings;
if (args.i_tag.size()) {
@@ -103,19 +117,26 @@ void Bridge::create(ConnectionState& c)
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
} else {
- const string& peerTag = c.getFederationPeerTag();
+ const string& peerTag = connState->getFederationPeerTag();
if (peerTag.size())
queueSettings.setString("qpid.trace.exclude", peerTag);
}
bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings);
if (!args.i_dynamic)
- peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() == 0)
+ throw Exception("Exchange not found for dynamic route");
+ exchange->registerDynamicBridge(this);
+ }
}
}
@@ -123,6 +144,11 @@ void Bridge::cancel()
{
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() != 0)
+ exchange->removeDynamicBridge(this);
+ }
}
void Bridge::destroy()
@@ -220,4 +246,46 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
}
}
+void Bridge::propagateBinding(const string& key, const string& tagList,
+ const string& op, const string& origin)
+{
+ const string& localTag = link->getBroker()->getFederationTag();
+ const string& peerTag = connState->getFederationPeerTag();
+
+ if (tagList.find(peerTag) == tagList.npos) {
+ FieldTable bindArgs;
+ string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag);
+
+ bindArgs.setString(qpidFedOp, op);
+ bindArgs.setString(qpidFedTags, newTagList);
+ if (origin.empty())
+ bindArgs.setString(qpidFedOrigin, localTag);
+ else
+ bindArgs.setString(qpidFedOrigin, origin);
+
+ peer->getExchange().bind(queueName, args.i_src, key, bindArgs);
+ }
+}
+
+void Bridge::sendReorigin()
+{
+ FieldTable bindArgs;
+
+ bindArgs.setString(qpidFedOp, fedOpReorigin);
+ bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
+
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs);
+}
+
+bool Bridge::containsLocalTag(const string& tagList) const
+{
+ const string& localTag = link->getBroker()->getFederationTag();
+ return (tagList.find(localTag) != tagList.npos);
+}
+
+const string& Bridge::getLocalTag() const
+{
+ return link->getBroker()->getFederationTag();
+}
+
}}
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index 057bc68fe2..c530a5d696 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -27,6 +27,7 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/management/Manageable.h"
+#include "Exchange.h"
#include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
#include "qmf/org/apache/qpid/broker/Bridge.h"
@@ -41,7 +42,7 @@ class ConnectionState;
class Link;
class LinkRegistry;
-class Bridge : public PersistableConfig, public management::Manageable
+class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
@@ -69,6 +70,12 @@ public:
const std::string& getName() const;
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ // Exchange::DynamicBridge methods
+ void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin);
+ void sendReorigin();
+ bool containsLocalTag(const std::string& tagList) const;
+ const std::string& getLocalTag() const;
+
private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
@@ -87,7 +94,9 @@ private:
qmf::org::apache::qpid::broker::Bridge* mgmtObject;
CancellationListener listener;
std::string name;
+ std::string queueName;
mutable uint64_t persistenceId;
+ ConnectionState* connState;
};
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 8fc8260f9e..9976167fa9 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -28,25 +28,45 @@ using namespace qpid::sys;
using qpid::management::Manageable;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
{
if (mgmtExchange != 0)
- mgmtExchange->set_type (typeName);
+ mgmtExchange->set_type(typeName);
}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+DirectExchange::DirectExchange(const string& _name, bool _durable,
const FieldTable& _args, Manageable* _parent) :
Exchange(_name, _durable, _args, _parent)
{
if (mgmtExchange != 0)
- mgmtExchange->set_type (typeName);
+ mgmtExchange->set_type(typeName);
}
-bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- {
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+ string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+ string fedTags(args ? args->getAsString(qpidFedTags) : "");
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ bool propagate = false;
+
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
Mutex::ScopedLock l(lock);
- Binding::shared_ptr b(new Binding (routingKey, queue, this));
- if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
+ Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin));
+ BoundKey& bk = bindings[routingKey];
+ if (bk.queues.add_unless(b, MatchQueue(queue))) {
+ propagate = bk.fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -54,30 +74,58 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
} else {
return false;
}
+ } else if (fedOp == fedOpUnbind) {
+ Mutex::ScopedLock l(lock);
+ BoundKey& bk = bindings[routingKey];
+ propagate = bk.fedBinding.delOrigin(fedOrigin);
+ if (bk.fedBinding.count() == 0)
+ unbind(queue, routingKey, 0);
+ } else if (fedOp == fedOpReorigin) {
+ for (std::map<string, BoundKey>::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+ if (bk.fedBinding.hasLocal()) {
+ propagateFedOp(iter->first, string(), fedOpBind, string());
+ }
+ }
}
+
routeIVE();
+ if (propagate)
+ propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
return true;
}
-bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- Mutex::ScopedLock l(lock);
- if (bindings[routingKey].remove_if(MatchQueue(queue))) {
- if (mgmtExchange != 0) {
- mgmtExchange->dec_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+{
+ bool propagate = false;
+
+ {
+ Mutex::ScopedLock l(lock);
+ BoundKey& bk = bindings[routingKey];
+ if (bk.queues.remove_if(MatchQueue(queue))) {
+ propagate = bk.fedBinding.delOrigin();
+ if (mgmtExchange != 0) {
+ mgmtExchange->dec_bindingCount();
+ ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
+ }
+ } else {
+ return false;
}
- return true;
- } else {
- return false;
}
+
+ if (propagate)
+ propagateFedOp(routingKey, string(), fedOpUnbind, string());
+ return true;
}
-void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
PreRoute pr(msg, this);
Queues::ConstPtr p;
{
Mutex::ScopedLock l(lock);
- p = bindings[routingKey].snapshot();
+ p = bindings[routingKey].queues.snapshot();
}
int count(0);
@@ -85,26 +133,26 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, const Fie
for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
msg.deliverTo((*i)->queue);
if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ (*i)->mgmtBinding->inc_msgMatched();
}
}
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
+ mgmtExchange->inc_msgDrops();
+ mgmtExchange->inc_byteDrops(msg.contentSize());
}
} else {
if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ mgmtExchange->inc_msgRoutes(count);
+ mgmtExchange->inc_byteRoutes(count * msg.contentSize());
}
}
if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
+ mgmtExchange->inc_msgReceives();
+ mgmtExchange->inc_byteReceives(msg.contentSize());
}
}
@@ -120,14 +168,14 @@ bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routin
if (!queue)
return true;
- Queues::ConstPtr p = i->second.snapshot();
+ Queues::ConstPtr p = i->second.queues.snapshot();
return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
- Queues::ConstPtr p = i->second.snapshot();
+ Queues::ConstPtr p = i->second.queues.snapshot();
if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
}
return false;
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 2516ce4a13..ba60469df8 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -31,33 +31,35 @@
namespace qpid {
namespace broker {
- class DirectExchange : public virtual Exchange{
- typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
- typedef std::map<string, Queues> Bindings;
- Bindings bindings;
- qpid::sys::Mutex lock;
-
- public:
- static const std::string typeName;
-
- DirectExchange(const std::string& name, management::Manageable* parent = 0);
- DirectExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
+class DirectExchange : public virtual Exchange {
+ typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
+ struct BoundKey {
+ Queues queues;
+ FedBinding fedBinding;
+ };
+ typedef std::map<string, BoundKey> Bindings;
+ Bindings bindings;
+ qpid::sys::Mutex lock;
- virtual std::string getType() const { return typeName; }
+public:
+ static const std::string typeName;
- virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ DirectExchange(const std::string& name, management::Manageable* parent = 0);
+ DirectExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual std::string getType() const { return typeName; }
+
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
- virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+ virtual ~DirectExchange();
- virtual ~DirectExchange();
- };
-}
-}
+ virtual bool supportsDynamicBinding() { return true; }
+};
+}}
#endif
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 3cea904676..f437946194 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -40,6 +40,14 @@ namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
const std::string qpidIVE("qpid.ive");
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
}
@@ -73,7 +81,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0)
+ sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -89,7 +97,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
: name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
- sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+ sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -107,7 +115,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
}
}
}
-
+
sequence = _args.get(qpidMsgSequence);
if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
@@ -166,8 +174,46 @@ ManagementObject* Exchange::GetManagementObject (void) const
return (ManagementObject*) mgmtExchange;
}
+void Exchange::registerDynamicBridge(DynamicBridge* db)
+{
+ if (!supportsDynamicBinding())
+ throw Exception("Exchange type does not support dynamic binding");
+
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ (*iter)->sendReorigin();
+
+ bridgeVector.push_back(db);
+ FieldTable args;
+ args.setString(qpidFedOp, fedOpReorigin);
+ bind(Queue::shared_ptr(), string(), &args);
+}
+
+void Exchange::removeDynamicBridge(DynamicBridge* db)
+{
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ if (*iter == db) {
+ bridgeVector.erase(iter);
+ break;
+ }
+}
+
+void Exchange::handleHelloRequest()
+{
+}
+
+void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin)
+{
+ string myOp(op.empty() ? fedOpBind : op);
+
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ (*iter)->propagateBinding(routingKey, tags, op, origin);
+}
+
Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent,
- FieldTable _args)
+ FieldTable _args, const string& origin)
: queue(_queue), key(_key), args(_args), mgmtBinding(0)
{
if (parent != 0)
@@ -181,6 +227,8 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
management::ObjectId queueId = mo->getObjectId();
mgmtBinding = new _qmf::Binding
(agent, this, (Manageable*) parent, queueId, key, args);
+ if (!origin.empty())
+ mgmtBinding->set_origin(origin);
agent->addObject (mgmtBinding);
}
}
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 1963ba1134..d1b38ea8b6 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -74,7 +74,7 @@ protected:
qmf::org::apache::qpid::broker::Binding* mgmtBinding;
Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
- framing::FieldTable args = framing::FieldTable());
+ framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
~Binding();
management::ManagementObject* GetManagementObject() const;
};
@@ -85,6 +85,34 @@ protected:
bool operator()(Exchange::Binding::shared_ptr b);
};
+ class FedBinding {
+ uint32_t localBindings;
+ std::set<std::string> originSet;
+ public:
+ FedBinding() : localBindings(0) {}
+ bool hasLocal() const { return localBindings != 0; }
+ bool addOrigin(const std::string& origin) {
+ if (origin.empty()) {
+ localBindings++;
+ return localBindings == 1;
+ }
+ originSet.insert(origin);
+ return true;
+ }
+ bool delOrigin(const std::string& origin) {
+ originSet.erase(origin);
+ return true;
+ }
+ bool delOrigin() {
+ if (localBindings > 0)
+ localBindings--;
+ return localBindings == 0;
+ }
+ uint32_t count() {
+ return localBindings + originSet.size();
+ }
+ };
+
qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
public:
@@ -121,6 +149,27 @@ public:
// Manageable entry points
management::ManagementObject* GetManagementObject(void) const;
+
+ // Federation hooks
+ class DynamicBridge {
+ public:
+ virtual ~DynamicBridge() {}
+ virtual void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin) = 0;
+ virtual void sendReorigin() = 0;
+ virtual bool containsLocalTag(const std::string& tagList) const = 0;
+ virtual const std::string& getLocalTag() const = 0;
+ };
+
+ void registerDynamicBridge(DynamicBridge* db);
+ void removeDynamicBridge(DynamicBridge* db);
+ virtual bool supportsDynamicBinding() { return false; }
+
+protected:
+ std::vector<DynamicBridge*> bridgeVector;
+
+ virtual void handleHelloRequest();
+ void propagateFedOp(const std::string& routingKey, const std::string& tags,
+ const std::string& op, const std::string& origin);
};
}}
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 2628d8952f..aa1f7ff30a 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -26,6 +26,18 @@ using namespace qpid::framing;
using namespace qpid::sys;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
Exchange(_name, _parent)
{
@@ -41,32 +53,57 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
{
- Binding::shared_ptr binding (new Binding ("", queue, this));
- if (bindings.add_unless(binding, MatchQueue(queue))) {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+ string fedTags(args ? args->getAsString(qpidFedTags) : "");
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ bool propagate = false;
+
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
+ Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
+ if (bindings.add_unless(binding, MatchQueue(queue))) {
+ propagate = fedBinding.addOrigin(fedOrigin);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ }
+ } else {
+ return false;
+ }
+ } else if (fedOp == fedOpUnbind) {
+ propagate = fedBinding.delOrigin(fedOrigin);
+ if (fedBinding.count() == 0)
+ unbind(queue, "", 0);
+ } else if (fedOp == fedOpReorigin) {
+ if (fedBinding.hasLocal()) {
+ propagateFedOp(string(), string(), fedOpBind, string());
}
- routeIVE();
- return true;
- } else {
- return false;
}
+
+ routeIVE();
+ if (propagate)
+ propagateFedOp(string(), fedTags, fedOp, fedOrigin);
+ return true;
}
bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
{
+ bool propagate = false;
+
if (bindings.remove_if(MatchQueue(queue))) {
+ propagate = fedBinding.delOrigin();
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
}
- return true;
} else {
return false;
}
+
+ if (propagate)
+ propagateFedOp(string(), string(), fedOpUnbind, string());
+ return true;
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index cfe9875024..5884a19732 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -34,6 +34,7 @@ namespace broker {
class FanOutExchange : public virtual Exchange {
typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray;
BindingsArray bindings;
+ FedBinding fedBinding;
public:
static const std::string typeName;
@@ -53,6 +54,7 @@ class FanOutExchange : public virtual Exchange {
virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
virtual ~FanOutExchange();
+ virtual bool supportsDynamicBinding() { return true; }
};
}
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 7a06d7e0b9..a814c12eed 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -336,7 +336,7 @@ ManagementObject* Link::GetManagementObject (void) const
return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args, string&)
+Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args, string& text)
{
switch (op)
{
@@ -350,8 +350,22 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
// Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable)
- return Manageable::STATUS_INVALID_PARAMETER;
+ if (iargs.i_durable && !durable) {
+ text = "Can't create a durable route on a non-durable link";
+ return Manageable::STATUS_USER;
+ }
+
+ if (iargs.i_dynamic) {
+ Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
+ if (exchange.get() == 0) {
+ text = "Exchange not found";
+ return Manageable::STATUS_USER;
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ text = "Exchange type does not support dynamic routing";
+ return Manageable::STATUS_USER;
+ }
+ }
std::pair<Bridge::shared_ptr, bool> result =
links->declare (host, port, iargs.i_durable, iargs.i_src,
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 691b42a1ae..853c131571 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -31,6 +31,18 @@ namespace _qmf = qmf::org::apache::qpid::broker;
// - excessive string copying: should be 0 copy, match from original buffer.
// - match/lookup: use descision tree or other more efficient structure.
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
Tokens& Tokens::operator=(const std::string& s) {
clear();
if (s.empty()) return *this;
@@ -51,6 +63,15 @@ TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
return *this;
}
+void Tokens::key(string& keytext) const
+{
+ for (std::vector<string>::const_iterator iter = begin(); iter != end(); iter++) {
+ if (iter != begin())
+ keytext += ".";
+ keytext += *iter;
+ }
+}
+
namespace {
const std::string hashmark("#");
const std::string star("*");
@@ -81,7 +102,7 @@ void TopicPattern::normalize() {
namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string.
// Need StringRef class that operates on a string in place witout copy.
// Should be applied everywhere strings are extracted from frames.
//
@@ -130,30 +151,63 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- {
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+ string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+ string fedTags(args ? args->getAsString(qpidFedTags) : "");
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ bool propagate = false;
+ bool reallyUnbind;
+ TopicPattern routingPattern(routingKey);
+
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
RWlock::ScopedWlock l(lock);
- TopicPattern routingPattern(routingKey);
if (isBound(queue, routingPattern)) {
return false;
} else {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingPattern].push_back(binding);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin));
+ BoundKey& bk = bindings[routingPattern];
+ bk.bindingVector.push_back(binding);
+ propagate = bk.fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
}
+ } else if (fedOp == fedOpUnbind) {
+ {
+ RWlock::ScopedWlock l(lock);
+ BoundKey& bk = bindings[routingPattern];
+ propagate = bk.fedBinding.delOrigin(fedOrigin);
+ reallyUnbind = bk.fedBinding.count() == 0;
+ }
+ if (reallyUnbind)
+ unbind(queue, routingKey, 0);
+ } else if (fedOp == fedOpReorigin) {
+ for (std::map<TopicPattern, BoundKey>::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+ if (bk.fedBinding.hasLocal()) {
+ string propKey;
+ iter->first.key(propKey);
+ propagateFedOp(propKey, string(), fedOpBind, string());
+ }
+ }
}
+
routeIVE();
+ if (propagate)
+ propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
return true;
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Binding::vector& qv(bi->second);
if (bi == bindings.end()) return false;
+ BoundKey& bk = bi->second;
+ Binding::vector& qv(bk.bindingVector);
+ bool propagate = false;
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
@@ -161,11 +215,15 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co
break;
if(q == qv.end()) return false;
qv.erase(q);
+ propagate = bk.fedBinding.delOrigin();
if(qv.empty()) bindings.erase(bi);
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
}
+
+ if (propagate)
+ propagateFedOp(routingKey, string(), fedOpUnbind, string());
return true;
}
@@ -173,7 +231,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
{
BindingMap::iterator bi = bindings.find(pattern);
if (bi == bindings.end()) return false;
- Binding::vector& qv(bi->second);
+ Binding::vector& qv(bi->second.bindingVector);
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)
@@ -189,7 +247,7 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(tokens)) {
- Binding::vector& qv(i->second);
+ Binding::vector& qv(i->second.bindingVector);
for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
msg.deliverTo((*j)->queue);
if ((*j)->mgmtBinding != 0)
@@ -230,7 +288,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing
}
} else {
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- Binding::vector& qv(i->second);
+ Binding::vector& qv(i->second.bindingVector);
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 2e107142b7..f3a2e221f7 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -41,6 +41,7 @@ class Tokens : public std::vector<std::string> {
Tokens(const std::string& s) { operator=(s); }
/** Tokenizing assignment operator s */
Tokens & operator=(const std::string& s);
+ void key(std::string& key) const;
private:
size_t hash;
@@ -70,8 +71,12 @@ class TopicPattern : public Tokens
void normalize();
};
-class TopicExchange : public virtual Exchange{
- typedef std::map<TopicPattern, Binding::vector> BindingMap;
+class TopicExchange : public virtual Exchange {
+ struct BoundKey {
+ Binding::vector bindingVector;
+ FedBinding fedBinding;
+ };
+ typedef std::map<TopicPattern, BoundKey> BindingMap;
BindingMap bindings;
qpid::sys::RWlock lock;
@@ -94,6 +99,7 @@ class TopicExchange : public virtual Exchange{
virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
virtual ~TopicExchange();
+ virtual bool supportsDynamicBinding() { return true; }
};