summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp145
1 files changed, 78 insertions, 67 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index f595b81724..bab6f2ea55 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -44,9 +44,9 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
@@ -79,8 +79,8 @@ namespace
{
inline void mgntEnqStats(const Message& msg,
- _qmf::Queue* mgmtObject,
- _qmf::Broker* brokerMgmtObject)
+ _qmf::Queue::shared_ptr mgmtObject,
+ _qmf::Broker::shared_ptr brokerMgmtObject)
{
if (mgmtObject != 0) {
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
@@ -103,8 +103,8 @@ inline void mgntEnqStats(const Message& msg,
}
inline void mgntDeqStats(const Message& msg,
- _qmf::Queue* mgmtObject,
- _qmf::Broker* brokerMgmtObject)
+ _qmf::Queue::shared_ptr mgmtObject,
+ _qmf::Broker::shared_ptr brokerMgmtObject)
{
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
@@ -166,13 +166,11 @@ void Queue::TxPublish::rollback() throw()
}
Queue::Queue(const string& _name, const QueueSettings& _settings,
-// MessageStore* const _store,
AsyncStore* const _asyncStore,
Manageable* parent,
Broker* b) :
name(_name),
-// store(_store),
asyncStore(_asyncStore),
owner(0),
consumerCount(0),
@@ -183,8 +181,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
messages(new MessageDeque()),
persistenceId(0),
settings(b ? merge(_settings, b->getOptions()) : _settings),
- mgmtObject(0),
- brokerMgmtObject(0),
eventMode(0),
broker(b),
deleted(false),
@@ -199,27 +195,24 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
-
if (agent != 0) {
-// mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete);
+ mgmtObject = _qmf::Queue::shared_ptr(
+ new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete));
mgmtObject->set_arguments(settings.asMap());
-// agent->addObject(mgmtObject, 0, store != 0);
agent->addObject(mgmtObject, 0, asyncStore != 0);
- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject());
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
}
}
+
+ if ( settings.isBrowseOnly ) {
+ QPID_LOG ( info, "Queue " << name << " is browse-only." );
+ }
}
Queue::~Queue()
{
- if (mgmtObject != 0) {
- mgmtObject->resourceDestroy();
- if (brokerMgmtObject)
- brokerMgmtObject->dec_queueCount();
- }
}
bool isLocalTo(const OwnershipToken* token, const Message& msg)
@@ -246,9 +239,6 @@ void Queue::deliver(Message msg, TxBuffer* txn){
//'link' for whatever protocol is used; that would let protocol
//specific stuff be kept out the queue
- // Check for deferred delivery in a cluster.
- if (broker && broker->deferDelivery(name, msg))
- return;
if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg, 0);
@@ -307,7 +297,6 @@ void Queue::process(Message& msg)
void Queue::release(const QueueCursor& position, bool markRedelivered)
{
- assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -333,7 +322,6 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position)
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
QPID_LOG(debug, "Attempting to dequeue message at " << position);
QueueCursor cursor;
Message* msg = messages->find(position, &cursor);
@@ -353,7 +341,6 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position)
bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
{
Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
Message* msg;
msg = messages->find(position);
@@ -375,12 +362,13 @@ bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
- checkNotDeleted(c);
+ if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
while (true) {
//TODO: reduce lock scope
Mutex::ScopedLock locker(messageLock);
- Message* msg = messages->next(*c);
+ QueueCursor cursor = c->getCursor(); // Save current position.
+ Message* msg = messages->next(*c); // Advances c.
if (msg) {
if (msg->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
@@ -419,6 +407,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ c->setCursor(cursor); // Restore cursor, will try again with credit
if (c->preAcquires()) {
//let someone else try
listeners.populate(set);
@@ -480,7 +469,6 @@ bool Queue::find(SequenceNumber pos, Message& msg) const
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
{
- assertClusterSafe();
{
Mutex::ScopedLock locker(messageLock);
// NOTE: consumerCount is actually a count of all
@@ -488,6 +476,11 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
// Check for exclusivity of acquiring consumers.
size_t acquiringConsumers = consumerCount - browserCount;
if (c->preAcquires()) {
+ if(settings.isBrowseOnly) {
+ throw NotAllowedException(
+ QPID_MSG("Queue " << name << " is browse only. Refusing acquiring consumer."));
+ }
+
if(exclusive) {
throw ResourceLockedException(
QPID_MSG("Queue " << getName()
@@ -502,22 +495,29 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
}
}
}
- else
+ else if(c->isCounted()) {
browserCount++;
- consumerCount++;
- //reset auto deletion timer if necessary
- if (settings.autoDeleteDelay && autoDeleteTask) {
- autoDeleteTask->cancel();
}
- observeConsumerAdd(*c, locker);
+ if(c->isCounted()) {
+ consumerCount++;
+
+ //reset auto deletion timer if necessary
+ if (settings.autoDeleteDelay && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
+
+ observeConsumerAdd(*c, locker);
+ }
+ }
+ if (mgmtObject != 0 && c->isCounted()) {
+ mgmtObject->inc_consumerCount();
}
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
}
void Queue::cancel(Consumer::shared_ptr c)
{
removeListener(c);
+ if(c->isCounted())
{
Mutex::ScopedLock locker(messageLock);
consumerCount--;
@@ -525,8 +525,9 @@ void Queue::cancel(Consumer::shared_ptr c)
if(exclusive) exclusive = 0;
observeConsumerRemove(*c, locker);
}
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ if (mgmtObject != 0 && c->isCounted()) {
+ mgmtObject->dec_consumerCount();
+ }
}
/**
@@ -733,7 +734,6 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
void Queue::push(Message& message, bool /*isRecovery*/)
{
- assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -1101,8 +1101,16 @@ void Queue::destroyed()
notifyDeleted();
{
Mutex::ScopedLock lock(messageLock);
+ for_each(observers.begin(), observers.end(),
+ boost::bind(&QueueObserver::destroy, _1));
observers.clear();
}
+
+ if (mgmtObject != 0) {
+ mgmtObject->resourceDestroy();
+ if (brokerMgmtObject)
+ brokerMgmtObject->dec_queueCount();
+ }
}
void Queue::notifyDeleted()
@@ -1136,7 +1144,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
{
if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore)
{
- ManagementObject* childObj = externalQueueStore->GetManagementObject();
+ ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject();
if (childObj != 0)
childObj->setReference(mgmtObject->getObjectId());
}
@@ -1180,6 +1188,7 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
{
alternateExchange = exchange;
+ alternateExchange->incAlternateUsers();
if (mgmtObject) {
if (exchange.get() != 0)
mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
@@ -1197,14 +1206,10 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::strin
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
- QPID_LOG(debug, "Auto-deleting " << queue->getName());
- queue->destroyed();
-
- if (broker.getManagementAgent())
- broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
- QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName()
+ QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName()
<< " user:" << userId
<< " rhost:" << connectionId );
+ queue->destroyed();
}
}
@@ -1233,7 +1238,7 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::st
if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
- broker.getClusterTimer().add(queue->autoDeleteTask);
+ broker.getTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
tryAutoDeleteImpl(broker, queue, connectionId, userId);
@@ -1290,7 +1295,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
externalQueueStore = inst;
if (inst) {
- ManagementObject* childObj = inst->GetManagementObject();
+ ManagementObject::shared_ptr childObj = inst->GetManagementObject();
if (childObj != 0 && mgmtObject != 0)
childObj->setReference(mgmtObject->getObjectId());
}
@@ -1378,9 +1383,9 @@ void Queue::countLoadedFromDisk(uint64_t size) const
}
-ManagementObject* Queue::GetManagementObject (void) const
+ManagementObject::shared_ptr Queue::GetManagementObject(void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext)
@@ -1459,6 +1464,16 @@ SequenceNumber Queue::getPosition() {
return sequence;
}
+void Queue::getRange(framing::SequenceNumber& front, framing::SequenceNumber& back,
+ SubscriptionType type)
+{
+ Mutex::ScopedLock locker(messageLock);
+ QueueCursor cursor(type);
+ back = sequence;
+ Message* message = messages->next(cursor);
+ front = message ? message->getSequence() : back+1;
+}
+
int Queue::getEventMode() { return eventMode; }
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
@@ -1493,20 +1508,11 @@ void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&)
mgntEnqStats(m, mgmtObject, brokerMgmtObject);
}
-// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's
-// state is not changed while listeners is referenced.
-QueueListeners& Queue::getListeners() { return listeners; }
-
-// Note: accessing messages outside of lock is dangerous. Caller must ensure the queue's
-// state is not changed while messages is referenced.
-Messages& Queue::getMessages() { return *messages; }
-const Messages& Queue::getMessages() const { return *messages; }
-
-void Queue::checkNotDeleted(const Consumer::shared_ptr& c)
+bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
{
- if (deleted && !c->hideDeletedError()) {
+ if (deleted && !c->hideDeletedError())
throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
- }
+ return !deleted;
}
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
@@ -1641,7 +1647,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
{
- Monitor::ScopedLock l(parent.messageLock); /** @todo: use a dedicated lock instead of messageLock */
+ Monitor::ScopedLock l(usageLock);
if (parent.deleted) {
return false;
} else {
@@ -1652,15 +1658,20 @@ bool Queue::UsageBarrier::acquire()
void Queue::UsageBarrier::release()
{
- Monitor::ScopedLock l(parent.messageLock);
- if (--count == 0) parent.messageLock.notifyAll();
+ Monitor::ScopedLock l(usageLock);
+ if (--count == 0) usageLock.notifyAll();
}
void Queue::UsageBarrier::destroy()
{
- Monitor::ScopedLock l(parent.messageLock);
+ Monitor::ScopedLock l(usageLock);
parent.deleted = true;
- while (count) parent.messageLock.wait();
+ while (count) usageLock.wait();
+}
+
+void Queue::addArgument(const string& key, const types::Variant& value) {
+ settings.original.insert(types::Variant::Map::value_type(key, value));
+ if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
}
}}