summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AddressResolution.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp60
1 files changed, 15 insertions, 45 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 16e5fde075..f1295a3b66 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -129,10 +129,6 @@ const std::string HEADERS_EXCHANGE("headers");
const std::string XML_EXCHANGE("xml");
const std::string WILDCARD_ANY("#");
-//exchange prefixes:
-const std::string PREFIX_AMQ("amq.");
-const std::string PREFIX_QPID("qpid.");
-
const Verifier verifier;
}
@@ -203,7 +199,6 @@ class Exchange : protected Node
void checkCreate(qpid::client::AsyncSession&, CheckMode);
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
- bool isReservedName();
protected:
const std::string specifiedType;
@@ -238,8 +233,6 @@ class Subscription : public Exchange, public MessageSource
const bool reliable;
const bool durable;
const std::string actualType;
- const bool exclusiveQueue;
- const bool exclusiveSubscription;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -314,7 +307,6 @@ struct Opt
Opt& operator/(const std::string& name);
operator bool() const;
std::string str() const;
- bool asBool(bool defaultValue) const;
const Variant::List& asList() const;
void collect(qpid::framing::FieldTable& args) const;
@@ -346,12 +338,6 @@ Opt::operator bool() const
return value && !value->isVoid() && value->asBool();
}
-bool Opt::asBool(bool defaultValue) const
-{
- if (value) return value->asBool();
- else return defaultValue;
-}
-
std::string Opt::str() const
{
if (value) return value->asString();
@@ -495,7 +481,7 @@ std::string Subscription::getSubscriptionName(const std::string& base, const std
if (name.empty()) {
return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
} else {
- return name;
+ return (boost::format("%1%_%2%") % base % name).str();
}
}
@@ -504,9 +490,7 @@ Subscription::Subscription(const Address& address, const std::string& type)
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
- actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
- exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
- exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -566,7 +550,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
checkAssert(session, FOR_RECEIVER);
//create subscription queue:
- session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
+ session.queueDeclare(arg::queue=queue, arg::exclusive=true,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
@@ -575,15 +559,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
- session.messageSubscribe(arg::queue=queue, arg::destination=destination,
- arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
linkBindings.unbind(session);
session.messageCancel(destination);
- if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
+ session.queueDelete(arg::queue=queue);
checkDelete(session, FOR_RECEIVER);
}
@@ -777,32 +761,18 @@ Exchange::Exchange(const Address& a) : Node(a),
linkBindings.setDefaultExchange(name);
}
-bool Exchange::isReservedName()
-{
- return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos;
-}
-
void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
try {
- if (isReservedName()) {
- try {
- sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
- } catch (const qpid::framing::NotFoundException& /*e*/) {
- throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str());
- }
-
- } else {
- std::string type = specifiedType;
- if (type.empty()) type = TOPIC_EXCHANGE;
- session.exchangeDeclare(arg::exchange=name,
- arg::type=type,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
- }
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
+ session.exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
nodeBindings.bind(session);
session.sync();
} catch (const qpid::framing::NotAllowedException& e) {
@@ -852,7 +822,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
- } else if (*i->second != *v) {
+ } else if (i->second != v) {
throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}