diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AddressResolution.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 60 |
1 files changed, 15 insertions, 45 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 16e5fde075..f1295a3b66 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -129,10 +129,6 @@ const std::string HEADERS_EXCHANGE("headers"); const std::string XML_EXCHANGE("xml"); const std::string WILDCARD_ANY("#"); -//exchange prefixes: -const std::string PREFIX_AMQ("amq."); -const std::string PREFIX_QPID("qpid."); - const Verifier verifier; } @@ -203,7 +199,6 @@ class Exchange : protected Node void checkCreate(qpid::client::AsyncSession&, CheckMode); void checkAssert(qpid::client::AsyncSession&, CheckMode); void checkDelete(qpid::client::AsyncSession&, CheckMode); - bool isReservedName(); protected: const std::string specifiedType; @@ -238,8 +233,6 @@ class Subscription : public Exchange, public MessageSource const bool reliable; const bool durable; const std::string actualType; - const bool exclusiveQueue; - const bool exclusiveSubscription; FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; @@ -314,7 +307,6 @@ struct Opt Opt& operator/(const std::string& name); operator bool() const; std::string str() const; - bool asBool(bool defaultValue) const; const Variant::List& asList() const; void collect(qpid::framing::FieldTable& args) const; @@ -346,12 +338,6 @@ Opt::operator bool() const return value && !value->isVoid() && value->asBool(); } -bool Opt::asBool(bool defaultValue) const -{ - if (value) return value->asBool(); - else return defaultValue; -} - std::string Opt::str() const { if (value) return value->asString(); @@ -495,7 +481,7 @@ std::string Subscription::getSubscriptionName(const std::string& base, const std if (name.empty()) { return (boost::format("%1%_%2%") % base % Uuid(true).str()).str(); } else { - return name; + return (boost::format("%1%_%2%") % base % name).str(); } } @@ -504,9 +490,7 @@ Subscription::Subscription(const Address& address, const std::string& type) queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())), reliable(AddressResolution::is_reliable(address)), durable(Opt(address)/LINK/DURABLE), - actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), - exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), - exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)) + actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type) { (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); @@ -566,7 +550,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str checkAssert(session, FOR_RECEIVER); //create subscription queue: - session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, + session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); //'default' binding: bindings.bind(session); @@ -575,15 +559,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str linkBindings.bind(session); //subscribe to subscription queue: AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; - session.messageSubscribe(arg::queue=queue, arg::destination=destination, - arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions); + session.messageSubscribe(arg::queue=queue, arg::destination=destination, + arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); } void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) { linkBindings.unbind(session); session.messageCancel(destination); - if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true); + session.queueDelete(arg::queue=queue); checkDelete(session, FOR_RECEIVER); } @@ -777,32 +761,18 @@ Exchange::Exchange(const Address& a) : Node(a), linkBindings.setDefaultExchange(name); } -bool Exchange::isReservedName() -{ - return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos; -} - void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) { if (enabled(createPolicy, mode)) { try { - if (isReservedName()) { - try { - sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); - } catch (const qpid::framing::NotFoundException& /*e*/) { - throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str()); - } - - } else { - std::string type = specifiedType; - if (type.empty()) type = TOPIC_EXCHANGE; - session.exchangeDeclare(arg::exchange=name, - arg::type=type, - arg::durable=durable, - arg::autoDelete=autoDelete, - arg::alternateExchange=alternateExchange, - arg::arguments=arguments); - } + std::string type = specifiedType; + if (type.empty()) type = TOPIC_EXCHANGE; + session.exchangeDeclare(arg::exchange=name, + arg::type=type, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); nodeBindings.bind(session); session.sync(); } catch (const qpid::framing::NotAllowedException& e) { @@ -852,7 +822,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (i->second != v) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } |