diff options
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 56 |
1 files changed, 52 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 3cea904676..f437946194 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -40,6 +40,14 @@ namespace { const std::string qpidMsgSequence("qpid.msg_sequence"); const std::string qpidIVE("qpid.ive"); +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); } @@ -73,7 +81,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0) + sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -89,7 +97,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), - sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) + sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -107,7 +115,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } } } - + sequence = _args.get(qpidMsgSequence); if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); @@ -166,8 +174,46 @@ ManagementObject* Exchange::GetManagementObject (void) const return (ManagementObject*) mgmtExchange; } +void Exchange::registerDynamicBridge(DynamicBridge* db) +{ + if (!supportsDynamicBinding()) + throw Exception("Exchange type does not support dynamic binding"); + + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + (*iter)->sendReorigin(); + + bridgeVector.push_back(db); + FieldTable args; + args.setString(qpidFedOp, fedOpReorigin); + bind(Queue::shared_ptr(), string(), &args); +} + +void Exchange::removeDynamicBridge(DynamicBridge* db) +{ + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + if (*iter == db) { + bridgeVector.erase(iter); + break; + } +} + +void Exchange::handleHelloRequest() +{ +} + +void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin) +{ + string myOp(op.empty() ? fedOpBind : op); + + for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); + iter != bridgeVector.end(); iter++) + (*iter)->propagateBinding(routingKey, tags, op, origin); +} + Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent, - FieldTable _args) + FieldTable _args, const string& origin) : queue(_queue), key(_key), args(_args), mgmtBinding(0) { if (parent != 0) @@ -181,6 +227,8 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang management::ObjectId queueId = mo->getObjectId(); mgmtBinding = new _qmf::Binding (agent, this, (Manageable*) parent, queueId, key, args); + if (!origin.empty()) + mgmtBinding->set_origin(origin); agent->addObject (mgmtBinding); } } |