diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueSettings.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueSettings.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 5 |
4 files changed, 20 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 0dd4cb7b10..b7096b5ea0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -206,6 +206,10 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, brokerMgmtObject->inc_queueCount(); } } + + if ( settings.isBrowseOnly ) { + QPID_LOG ( info, "Queue " << name << " is browse-only." ); + } } Queue::~Queue() @@ -483,6 +487,11 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) // Check for exclusivity of acquiring consumers. size_t acquiringConsumers = consumerCount - browserCount; if (c->preAcquires()) { + if(settings.isBrowseOnly) { + throw NotAllowedException( + QPID_MSG("Queue " << name << " is browse only. Refusing acquiring consumer.")); + } + if(exclusive) { throw ResourceLockedException( QPID_MSG("Queue " << getName() diff --git a/cpp/src/qpid/broker/QueueSettings.cpp b/cpp/src/qpid/broker/QueueSettings.cpp index b180a89e43..f31dda7444 100644 --- a/cpp/src/qpid/broker/QueueSettings.cpp +++ b/cpp/src/qpid/broker/QueueSettings.cpp @@ -39,6 +39,7 @@ const std::string POLICY_TYPE("qpid.policy_type"); const std::string POLICY_TYPE_REJECT("reject"); const std::string POLICY_TYPE_RING("ring"); const std::string NO_LOCAL("no-local"); +const std::string BROWSE_ONLY("browse-only"); const std::string TRACE_ID("qpid.trace.id"); const std::string TRACE_EXCLUDES("qpid.trace.exclude"); const std::string LVQ_KEY("qpid.last_value_queue_key"); @@ -82,6 +83,7 @@ QueueSettings::QueueSettings(bool d, bool a) : addTimestamp(false), dropMessagesAtLimit(false), noLocal(false), + isBrowseOnly(false), autoDeleteDelay(0), alertRepeatInterval(60) {} @@ -108,6 +110,9 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v } else if (key == NO_LOCAL) { noLocal = value; return true; + } else if (key == BROWSE_ONLY) { + isBrowseOnly = value; + return true; } else if (key == TRACE_ID) { traceId = value.asString(); return true; diff --git a/cpp/src/qpid/broker/QueueSettings.h b/cpp/src/qpid/broker/QueueSettings.h index 3256266df7..f955530313 100644 --- a/cpp/src/qpid/broker/QueueSettings.h +++ b/cpp/src/qpid/broker/QueueSettings.h @@ -59,6 +59,7 @@ struct QueueSettings bool dropMessagesAtLimit;//aka ring queue policy bool noLocal; + bool isBrowseOnly; std::string traceId; std::string traceExcludes; uint64_t autoDeleteDelay;//queueTtl? diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index c973098020..24c64bc521 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -422,6 +422,11 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); + if (queue->getSettings().isBrowseOnly && acquireMode == 0) { + QPID_LOG(info, "Overriding request to consume from browse-only queue " << queue->getName()); + acquireMode = 1; + } + // We allow browsing (acquireMode == 1) of exclusive queues, this is required by HA. if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0) throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " |