summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Exchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp56
1 files changed, 52 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 3cea904676..f437946194 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -40,6 +40,14 @@ namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
const std::string qpidIVE("qpid.ive");
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
}
@@ -73,7 +81,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0)
+ sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -89,7 +97,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
: name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
- sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+ sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -107,7 +115,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
}
}
}
-
+
sequence = _args.get(qpidMsgSequence);
if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
@@ -166,8 +174,46 @@ ManagementObject* Exchange::GetManagementObject (void) const
return (ManagementObject*) mgmtExchange;
}
+void Exchange::registerDynamicBridge(DynamicBridge* db)
+{
+ if (!supportsDynamicBinding())
+ throw Exception("Exchange type does not support dynamic binding");
+
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ (*iter)->sendReorigin();
+
+ bridgeVector.push_back(db);
+ FieldTable args;
+ args.setString(qpidFedOp, fedOpReorigin);
+ bind(Queue::shared_ptr(), string(), &args);
+}
+
+void Exchange::removeDynamicBridge(DynamicBridge* db)
+{
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ if (*iter == db) {
+ bridgeVector.erase(iter);
+ break;
+ }
+}
+
+void Exchange::handleHelloRequest()
+{
+}
+
+void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin)
+{
+ string myOp(op.empty() ? fedOpBind : op);
+
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ (*iter)->propagateBinding(routingKey, tags, op, origin);
+}
+
Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent,
- FieldTable _args)
+ FieldTable _args, const string& origin)
: queue(_queue), key(_key), args(_args), mgmtBinding(0)
{
if (parent != 0)
@@ -181,6 +227,8 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
management::ObjectId queueId = mo->getObjectId();
mgmtBinding = new _qmf::Binding
(agent, this, (Manageable*) parent, queueId, key, args);
+ if (!origin.empty())
+ mgmtBinding->set_origin(origin);
agent->addObject (mgmtBinding);
}
}