diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-09-22 17:36:01 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-09-22 17:36:01 +0000 |
commit | f13c131456c4fb0a027513242f2d8253aad75ca0 (patch) | |
tree | 5ccd77aeb648695b41c0444e44ffa039e012d124 | |
parent | 3d3fb015b49b088a6e1f641437cd6b7acb0ed6ec (diff) | |
download | qpid-python-f13c131456c4fb0a027513242f2d8253aad75ca0.tar.gz |
Joint checkin from gsim, kpvdr, cctrieloff. See QPID-2102: Exceeding reject queue policy under a transaction causes broker crash
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@817742 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/DtxAck.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 141 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.h | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.cpp | 54 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.h | 14 | ||||
-rw-r--r-- | cpp/src/tests/ExchangeTest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/MessageUtils.h | 5 | ||||
-rw-r--r-- | cpp/src/tests/QueuePolicyTest.cpp | 58 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 341 | ||||
-rw-r--r-- | cpp/src/tests/TxPublishTest.cpp | 2 |
19 files changed, 631 insertions, 192 deletions
diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp index b189ef4cdb..bca3f90bbe 100644 --- a/cpp/src/qpid/broker/DtxAck.cpp +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -48,12 +48,26 @@ bool DtxAck::prepare(TransactionContext* ctxt) throw() void DtxAck::commit() throw() { - for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed)); - pending.clear(); + try { + for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed)); + pending.clear(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } + } void DtxAck::rollback() throw() { - for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue)); - pending.clear(); + try { + for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue)); + pending.clear(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to complete rollback: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to complete rollback (unknown error)"); + } + } diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 7360010192..ae69a429c8 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -181,11 +181,23 @@ void Message::decodeContent(framing::Buffer& buffer) loaded = true; } -void Message::releaseContent(MessageStore* _store) +void Message::tryReleaseContent() { - if (!store) { - store = _store; + if (checkContentReleasable()) { + releaseContent(); } +} + +void Message::releaseContent(MessageStore* s) +{ + //deprecated, use setStore(store); releaseContent(); instead + if (!store) setStore(s); + releaseContent(); +} + +void Message::releaseContent() +{ + sys::Mutex::ScopedLock l(lock); if (store) { if (!getPersistenceId()) { intrusive_ptr<PersistableMessage> pmsg(this); @@ -234,6 +246,7 @@ bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxC void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { + sys::Mutex::ScopedLock l(lock); if (isContentReleased() && !frames.isComplete()) { uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index e4d09b1042..3845146955 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -129,12 +129,9 @@ public: QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - void releaseContent(MessageStore* store); + void tryReleaseContent(); + void releaseContent(); + void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead void destroy(); bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const; diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index 14b233fd6c..b1a2b77b05 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -80,7 +80,7 @@ void MessageBuilder::handle(AMQFrame& frame) && !NullMessageStore::isNullStore(store) && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */) { - message->releaseContent(store); + message->releaseContent(); staging = true; } } @@ -96,6 +96,7 @@ void MessageBuilder::end() void MessageBuilder::start(const SequenceNumber& id) { message = intrusive_ptr<Message>(new Message(id)); + message->setStore(store); state = METHOD; staging = false; } diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 2ef223aa81..303a0501f4 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -36,7 +36,6 @@ PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : asyncEnqueueCounter(0), asyncDequeueCounter(0), - contentReleased(false), store(0) {} @@ -59,9 +58,15 @@ void PersistableMessage::flush() } } -void PersistableMessage::setContentReleased() {contentReleased = true; } +void PersistableMessage::setContentReleased() +{ + contentReleaseState.released = true; +} -bool PersistableMessage::isContentReleased()const { return contentReleased; } +bool PersistableMessage::isContentReleased() const +{ + return contentReleaseState.released; +} bool PersistableMessage::isEnqueueComplete() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); @@ -153,6 +158,26 @@ void PersistableMessage::dequeueAsync() { asyncDequeueCounter++; } +PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} + +void PersistableMessage::setStore(MessageStore* s) +{ + store = s; +} + +void PersistableMessage::requestContentRelease() +{ + contentReleaseState.requested = true; +} +void PersistableMessage::blockContentRelease() +{ + contentReleaseState.blocked = true; +} +bool PersistableMessage::checkContentReleasable() +{ + return contentReleaseState.requested && !contentReleaseState.blocked; +} + }} diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 0274b41375..a968480249 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -68,8 +68,16 @@ class PersistableMessage : public Persistable void enqueueAsync(); void dequeueAsync(); - bool contentReleased; syncList synclist; + struct ContentReleaseState + { + bool blocked; + bool requested; + bool released; + + ContentReleaseState(); + }; + ContentReleaseState contentReleaseState; protected: /** Called when all enqueues are complete for this message. */ @@ -97,6 +105,11 @@ class PersistableMessage : public Persistable void flush(); bool isContentReleased() const; + + void setStore(MessageStore*); + void requestContentRelease(); + void blockContentRelease(); + bool checkContentReleasable(); QPID_BROKER_EXTERN bool isEnqueueComplete(); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b2a8e223c5..1cc48a949e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -181,6 +181,8 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ + if (policy.get()) policy->recoverEnqueued(msg); + push(msg, true); if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure @@ -563,15 +565,12 @@ void Queue::popMsg(QueuedMessage& qmsg) } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ - Messages dequeues; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (policy.get()) { - policy->tryEnqueue(qm); - //depending on policy, may have some dequeues - if (!isRecovery) pendingDequeues.swap(dequeues); + policy->enqueued(qm); } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); @@ -608,10 +607,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ } } copy.notify(); - if (!dequeues.empty()) { - //depending on policy, may have some dequeues - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); - } } QueuedMessage Queue::getFront() @@ -697,8 +692,12 @@ void Queue::setLastNodeFailure() } // return true if store exists, -bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) +bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { + if (policy.get() && !suppressPolicyCheck) { + policy->tryEnqueue(msg); + } + if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } @@ -713,9 +712,21 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) store->enqueue(ctxt, pmsg, *this); return true; } + if (!store) { + //Messages enqueued on a transient queue should be prevented + //from having their content released as it may not be + //recoverable by these queue for delivery + msg->blockContentRelease(); + } return false; } +void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) +{ + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->enqueueAborted(msg); +} + // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { @@ -781,7 +792,15 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings, bool recovering) { - setPolicy(QueuePolicy::createQueuePolicy(_settings)); + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && + (!store || NullMessageStore::isNullStore(store))) { + QPID_LOG(warning, "Flow to disk not valid for non-persisted queue"); + FieldTable copy(_settings); + copy.erase(QueuePolicy::typeKey); + setPolicy(QueuePolicy::createQueuePolicy(getName(), copy)); + } else { + setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); + } //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue with no-local=" << noLocal); @@ -975,19 +994,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } -bool Queue::releaseMessageContent(const QueuedMessage& m) -{ - if (store && !NullMessageStore::isNullStore(store)) { - QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); - m.payload->releaseContent(store); - return true; - } else { - QPID_LOG(warning, "Message " << m.position << " on " << name - << " cannot be released from memory as the queue is not durable"); - return false; - } -} - ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; @@ -1044,11 +1050,12 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { if (m.payload) { - if (policy.get()) policy->tryEnqueue(m); - mgntEnqStats(m.payload); - if (m.payload->isPersistent()) { - enqueue ( 0, m.payload ); + if (policy.get()) { + policy->recoverEnqueued(m.payload); + policy->enqueued(m); } + mgntEnqStats(m.payload); + enqueue ( 0, m.payload, true ); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 77799fd967..9ac5e3f5e9 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -239,7 +239,8 @@ namespace qpid { QPID_BROKER_EXTERN void setLastNodeFailure(); QPID_BROKER_EXTERN void clearLastNodeFailure(); - bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); + bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck = false); + void enqueueAborted(boost::intrusive_ptr<Message> msg); /** * dequeue from store (only done once messages is acknowledged) */ @@ -315,8 +316,6 @@ namespace qpid { bindings.eachBinding(f); } - bool releaseMessageContent(const QueuedMessage&); - void popMsg(QueuedMessage& qmsg); /** Set the position sequence number for the next message on the queue. diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 39afe90134..0f1f7f370f 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -28,8 +28,8 @@ using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {} +QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {} void QueuePolicy::enqueued(uint64_t _size) { @@ -39,18 +39,15 @@ void QueuePolicy::enqueued(uint64_t _size) void QueuePolicy::dequeued(uint64_t _size) { - //Note: underflow detection is not reliable in the face of - //concurrent updates (at present locking in Queue.cpp prevents - //these anyway); updates are atomic and are safe regardless. if (maxCount) { - if (count.get() > 0) { + if (count > 0) { --count; } else { throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this)); } } if (maxSize) { - if (_size > size.get()) { + if (_size > size) { throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this)); } else { size -= _size; @@ -58,49 +55,53 @@ void QueuePolicy::dequeued(uint64_t _size) } } -bool QueuePolicy::checkLimit(const QueuedMessage& m) +bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { - bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize; - bool countExceeded = maxCount && (count.get() + 1) > maxCount; + bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize; + bool countExceeded = maxCount && (count + 1) > maxCount; bool exceeded = sizeExceeded || countExceeded; if (exceeded) { if (!policyExceeded) { - policyExceeded = true; - if (m.queue) { - if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName()); - if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName()); - } + policyExceeded = true; + if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name); + if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name); } } else { if (policyExceeded) { policyExceeded = false; - if (m.queue) { - QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName()); - } + QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name); } } return !exceeded; } -void QueuePolicy::tryEnqueue(const QueuedMessage& m) +void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) { + qpid::sys::Mutex::ScopedLock l(lock); if (checkLimit(m)) { - enqueued(m); + enqueued(m->contentSize()); } else { - std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue"); - throw ResourceLimitExceededException( - QPID_MSG("Policy exceeded on " << queue << " by message " << m.position - << " of size " << m.payload->contentSize() << " , policy: " << *this)); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this)); } } -void QueuePolicy::enqueued(const QueuedMessage& m) +void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m) +{ + qpid::sys::Mutex::ScopedLock l(lock); + enqueued(m->contentSize()); +} + +void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m) { - enqueued(m.payload->contentSize()); + qpid::sys::Mutex::ScopedLock l(lock); + dequeued(m->contentSize()); } +void QueuePolicy::enqueued(const QueuedMessage&) {} + void QueuePolicy::dequeued(const QueuedMessage& m) { + qpid::sys::Mutex::ScopedLock l(lock); dequeued(m.payload->contentSize()); } @@ -132,7 +133,7 @@ std::string QueuePolicy::getType(const FieldTable& settings) std::transform(t.begin(), t.end(), t.begin(), tolower); if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; } - return FLOW_TO_DISK; + return REJECT; } void QueuePolicy::setDefaultMaxSize(uint64_t s) @@ -148,8 +149,8 @@ void QueuePolicy::encode(Buffer& buffer) const { buffer.putLong(maxCount); buffer.putLongLong(maxSize); - buffer.putLong(count.get()); - buffer.putLongLong(size.get()); + buffer.putLong(count); + buffer.putLongLong(size); } void QueuePolicy::decode ( Buffer& buffer ) @@ -179,16 +180,18 @@ const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : - QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {} +FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) : + QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {} -bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) +bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { - return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m); + if (!QueuePolicy::checkLimit(m)) m->requestContentRelease(); + return true; } -RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} +RingQueuePolicy::RingQueuePolicy(const std::string& _name, + uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) { @@ -197,7 +200,6 @@ bool before(const QueuedMessage& a, const QueuedMessage& b) void RingQueuePolicy::enqueued(const QueuedMessage& m) { - QueuePolicy::enqueued(m); qpid::sys::Mutex::ScopedLock l(lock); //need to insert in correct location based on position queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m); @@ -222,42 +224,28 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) return find(m, pendingDequeues, false) || find(m, queue, false); } -bool RingQueuePolicy::checkLimit(const QueuedMessage& m) +bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept QueuedMessage oldest; - { - qpid::sys::Mutex::ScopedLock l(lock); - if (queue.empty()) { - QPID_LOG(debug, "Message too large for ring queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << " [" << *this << "] " - << ": message size = " << m.payload->contentSize() << " bytes"); - return false; - } - oldest = queue.front(); + if (queue.empty()) { + QPID_LOG(debug, "Message too large for ring queue " << name + << " [" << *this << "] " + << ": message size = " << m->contentSize() << " bytes"); + return false; } + oldest = queue.front(); if (oldest.queue->acquire(oldest) || !strict) { - { - //TODO: fix this! In the current code, this method is - //only ever called with the Queue lock already taken. This - //should not be relied upon going forward however and - //clearly the locking in this class is insufficient as - //there is no guarantee that the message previously atthe - //front is still there. - qpid::sys::Mutex::ScopedLock l(lock); - queue.pop_front(); - pendingDequeues.push_back(oldest); - } - oldest.queue->addPendingDequeue(oldest); - QPID_LOG(debug, "Ring policy triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << ": removed message " << oldest.position << " to make way for " << m.position); + queue.pop_front(); + pendingDequeues.push_back(oldest); + QPID_LOG(debug, "Ring policy triggered in " << name + << ": removed message " << oldest.position << " to make way for new message"); + qpid::sys::Mutex::ScopedUnlock u(lock); + oldest.queue->dequeue(0, oldest); return true; } else { - QPID_LOG(debug, "Ring policy could not be triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) + QPID_LOG(debug, "Ring policy could not be triggered in " << name << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); //in strict mode, if oldest message has been delivered (hence //cannot be acquired) but not yet acked, it should not be @@ -277,25 +265,36 @@ bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) return false; } +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +{ + return createQueuePolicy("<unspecified>", maxCount, maxSize, type); +} + std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings) { + return createQueuePolicy("<unspecified>", settings); +} + +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings) +{ uint32_t maxCount = getInt(settings, maxCountKey, 0); uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize); if (maxCount || maxSize) { - return createQueuePolicy(maxCount, maxSize, getType(settings)); + return createQueuePolicy(name, maxCount, maxSize, getType(settings)); } else { return std::auto_ptr<QueuePolicy>(); } } -std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, + uint32_t maxCount, uint64_t maxSize, const std::string& type) { if (type == RING || type == RING_STRICT) { - return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type)); } else if (type == FLOW_TO_DISK) { - return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize)); + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize)); } else { - return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type)); } } @@ -305,10 +304,10 @@ namespace qpid { std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) { - if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get(); + if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size; else out << "size: unlimited"; out << "; "; - if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get(); + if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; else out << "count: unlimited"; out << "; type=" << p.type; return out; diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h index 54745876d5..65c52304f2 100644 --- a/cpp/src/qpid/broker/QueuePolicy.h +++ b/cpp/src/qpid/broker/QueuePolicy.h @@ -40,12 +40,11 @@ class QueuePolicy uint32_t maxCount; uint64_t maxSize; const std::string type; - qpid::sys::AtomicValue<uint32_t> count; - qpid::sys::AtomicValue<uint64_t> size; + uint32_t count; + uint64_t size; bool policyExceeded; static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); - static std::string getType(const qpid::framing::FieldTable& settings); public: static QPID_BROKER_EXTERN const std::string maxCountKey; @@ -57,10 +56,12 @@ class QueuePolicy static QPID_BROKER_EXTERN const std::string RING_STRICT; virtual ~QueuePolicy() {} - QPID_BROKER_EXTERN void tryEnqueue(const QueuedMessage&); + QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg); + virtual void enqueued(const QueuedMessage&); virtual void dequeued(const QueuedMessage&); virtual bool isEnqueued(const QueuedMessage&); - virtual bool checkLimit(const QueuedMessage&); QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings); uint32_t getMaxCount() const { return maxCount; } uint64_t getMaxSize() const { return maxSize; } @@ -69,15 +70,21 @@ class QueuePolicy uint32_t encodedSize() const; + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static std::string getType(const qpid::framing::FieldTable& settings); static void setDefaultMaxSize(uint64_t); friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueuePolicy&); protected: - QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + const std::string name; + qpid::sys::Mutex lock; - virtual void enqueued(const QueuedMessage&); + QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + + virtual bool checkLimit(boost::intrusive_ptr<Message> msg); void enqueued(uint64_t size); void dequeued(uint64_t size); }; @@ -86,21 +93,20 @@ class QueuePolicy class FlowToDiskPolicy : public QueuePolicy { public: - FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize); - bool checkLimit(const QueuedMessage&); + FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize); + bool checkLimit(boost::intrusive_ptr<Message> msg); }; class RingQueuePolicy : public QueuePolicy { public: - RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); + RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); bool isEnqueued(const QueuedMessage&); - bool checkLimit(const QueuedMessage&); + bool checkLimit(boost::intrusive_ptr<Message> msg); private: typedef std::deque<QueuedMessage> Messages; - qpid::sys::Mutex lock; Messages pendingDequeues; Messages queue; const bool strict; diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index bdd5f33601..a59d29c3cc 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -356,6 +356,9 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { } else { DeliverableMessage deliverable(msg); route(msg, deliverable); + if (msg->checkContentReleasable()) { + msg->releaseContent(); + } } } diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index e47ac84990..928ac12c10 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -88,7 +88,13 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw() void TxAccept::commit() throw() { - ops.commit(); + try { + ops.commit(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } } void TxAccept::rollback() throw() {} diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 17b99fd883..4b083033ea 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -26,9 +26,14 @@ using namespace qpid::broker; TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {} -bool TxPublish::prepare(TransactionContext* ctxt) throw(){ +bool TxPublish::prepare(TransactionContext* ctxt) throw() +{ try{ - for_each(queues.begin(), queues.end(), Prepare(ctxt, msg)); + while (!queues.empty()) { + prepare(ctxt, queues.front()); + prepared.push_back(queues.front()); + queues.pop_front(); + } return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -38,11 +43,30 @@ bool TxPublish::prepare(TransactionContext* ctxt) throw(){ return false; } -void TxPublish::commit() throw(){ - for_each(queues.begin(), queues.end(), Commit(msg)); +void TxPublish::commit() throw() +{ + try { + for_each(prepared.begin(), prepared.end(), Commit(msg)); + if (msg->checkContentReleasable()) { + msg->releaseContent(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } } -void TxPublish::rollback() throw(){ +void TxPublish::rollback() throw() +{ + try { + for_each(prepared.begin(), prepared.end(), Rollback(msg)); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to complete rollback: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to complete rollback (unknown error)"); + } + } void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ @@ -54,16 +78,14 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ } } -TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg) - : ctxt(_ctxt), msg(_msg){} - -void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){ +void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue) +{ if (!queue->enqueue(ctxt, msg)){ /** - * if not store then mark message for ack and deleivery once - * commit happens, as async IO will never set it when no store - * exists - */ + * if not store then mark message for ack and deleivery once + * commit happens, as async IO will never set it when no store + * exists + */ msg->enqueueComplete(); } } @@ -74,6 +96,12 @@ void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){ queue->process(msg); } +TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){} + +void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){ + queue->enqueueAborted(msg); +} + uint64_t TxPublish::contentSize () { return msg->contentSize (); diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index d5cf5639c4..b6ab9767ab 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -47,23 +47,25 @@ namespace qpid { * dispatch or to be added to the in-memory queue. */ class TxPublish : public TxOp, public Deliverable{ - class Prepare{ - TransactionContext* ctxt; + + class Commit{ boost::intrusive_ptr<Message>& msg; public: - Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg); + Commit(boost::intrusive_ptr<Message>& msg); void operator()(const boost::shared_ptr<Queue>& queue); }; - - class Commit{ + class Rollback{ boost::intrusive_ptr<Message>& msg; public: - Commit(boost::intrusive_ptr<Message>& msg); + Rollback(boost::intrusive_ptr<Message>& msg); void operator()(const boost::shared_ptr<Queue>& queue); }; boost::intrusive_ptr<Message> msg; std::list<Queue::shared_ptr> queues; + std::list<Queue::shared_ptr> prepared; + + void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>); public: QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg); diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 44835c6184..88a1cd99c2 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -60,7 +60,7 @@ QPID_AUTO_TEST_CASE(testMe) queue.reset(); queue2.reset(); - intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "key", "id")); + intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "key", false, "id")); DeliverableMessage msg(msgPtr); topic.route(msg, "abc", 0); direct.route(msg, "abc", 0); diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h index dae74cce7d..a1b140d484 100644 --- a/cpp/src/tests/MessageUtils.h +++ b/cpp/src/tests/MessageUtils.h @@ -34,7 +34,8 @@ namespace tests { struct MessageUtils { static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="", - const Uuid& messageId=Uuid(true), uint64_t contentSize = 0) + const bool durable = false, const Uuid& messageId=Uuid(true), + uint64_t contentSize = 0) { boost::intrusive_ptr<broker::Message> msg(new broker::Message()); @@ -47,6 +48,8 @@ struct MessageUtils props->setContentLength(contentSize); props->setMessageId(messageId); msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + if (durable) + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2); return msg; } diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp index f40d30b588..875976db85 100644 --- a/cpp/src/tests/QueuePolicyTest.cpp +++ b/cpp/src/tests/QueuePolicyTest.cpp @@ -48,56 +48,56 @@ QueuedMessage createMessage(uint32_t size) QPID_AUTO_TEST_CASE(testCount) { - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 0)); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0)); BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize()); BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount()); QueuedMessage msg = createMessage(10); for (size_t i = 0; i < 5; i++) { - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); } try { - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); BOOST_FAIL("Policy did not fail on enqueuing sixth message"); } catch (const ResourceLimitExceededException&) {} policy->dequeued(msg); - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); try { - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)"); } catch (const ResourceLimitExceededException&) {} } QPID_AUTO_TEST_CASE(testSize) { - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(0, 50)); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50)); QueuedMessage msg = createMessage(10); for (size_t i = 0; i < 5; i++) { - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); } try { - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); } catch (const ResourceLimitExceededException&) {} policy->dequeued(msg); - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); try { - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy); } catch (const ResourceLimitExceededException&) {} } QPID_AUTO_TEST_CASE(testBoth) { - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 50)); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50)); try { QueuedMessage msg = createMessage(51); - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy); } catch (const ResourceLimitExceededException&) {} @@ -108,17 +108,17 @@ QPID_AUTO_TEST_CASE(testBoth) messages.push_back(createMessage(2)); messages.push_back(createMessage(7)); for (size_t i = 0; i < messages.size(); i++) { - policy->tryEnqueue(messages[i]); + policy->tryEnqueue(messages[i].payload); } //size = 45 at this point, count = 5 try { QueuedMessage msg = createMessage(5); - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy); } catch (const ResourceLimitExceededException&) {} try { QueuedMessage msg = createMessage(10); - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); } catch (const ResourceLimitExceededException&) {} @@ -126,7 +126,7 @@ QPID_AUTO_TEST_CASE(testBoth) policy->dequeued(messages[0]); try { QueuedMessage msg = createMessage(20); - policy->tryEnqueue(msg); + policy->tryEnqueue(msg.payload); } catch (const ResourceLimitExceededException&) { BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy); } @@ -135,10 +135,10 @@ QPID_AUTO_TEST_CASE(testBoth) QPID_AUTO_TEST_CASE(testSettings) { //test reading and writing the policy from/to field table - std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy(101, 303)); + std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303)); FieldTable settings; a->update(settings); - std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy(settings)); + std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings)); BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount()); BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize()); } @@ -146,7 +146,7 @@ QPID_AUTO_TEST_CASE(testSettings) QPID_AUTO_TEST_CASE(testRingPolicy) { FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::RING); + std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING); policy->update(args); ProxySessionFixture f; @@ -175,7 +175,7 @@ QPID_AUTO_TEST_CASE(testRingPolicy) QPID_AUTO_TEST_CASE(testStrictRingPolicy) { FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::RING_STRICT); + std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT); policy->update(args); ProxySessionFixture f; @@ -201,7 +201,7 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy) QPID_AUTO_TEST_CASE(testPolicyWithDtx) { FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::REJECT); + std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT); policy->update(args); ProxySessionFixture f; @@ -282,6 +282,22 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore) } catch (const ResourceLimitExceededException&) {} } +QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit) +{ + FieldTable args; + std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT); + policy->update(args); + + ProxySessionFixture f; + std::string q("q"); + f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); + f.session.txSelect(); + for (int i = 0; i < 10; i++) { + f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + ScopedSuppressLogging sl; // Suppress messages for expected errors. + BOOST_CHECK_THROW(f.session.txCommit(), InternalErrorException); +} QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 841a19f7c1..756f4ac3f6 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -18,10 +18,13 @@ * under the License. * */ +#include "MessageUtils.h" #include "unit_test.h" #include "test_tools.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" @@ -30,12 +33,16 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include <iostream> #include "boost/format.hpp" using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; +using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; @@ -61,13 +68,14 @@ public: class FailOnDeliver : public Deliverable { - Message msg; + boost::intrusive_ptr<Message> msg; public: + FailOnDeliver() : msg(MessageUtils::createMessage()) {} void deliverTo(const boost::shared_ptr<Queue>& queue) { throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); } - Message& getMessage() { return msg; } + Message& getMessage() { return *(msg.get()); } }; intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) { @@ -210,8 +218,7 @@ QPID_AUTO_TEST_CASE(testDequeue){ } -QPID_AUTO_TEST_CASE(testBound) -{ +QPID_AUTO_TEST_CASE(testBound){ //test the recording of bindings, and use of those to allow a queue to be unbound string key("my-key"); FieldTable args; @@ -245,7 +252,6 @@ QPID_AUTO_TEST_CASE(testBound) } QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ - client::QueueOptions args; args.setPersistLastNode(); @@ -273,14 +279,35 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ } -class TestMessageStoreOC : public NullMessageStore +const std::string nullxid = ""; + +class SimpleDummyCtxt : public TransactionContext {}; + +class DummyCtxt : public TPCTransactionContext +{ + const std::string xid; + public: + DummyCtxt(const std::string& _xid) : xid(_xid) {} + static std::string getXid(TransactionContext& ctxt) + { + DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); + return c ? c->xid : nullxid; + } +}; + +class TestMessageStoreOC : public MessageStore { + std::set<std::string> prepared; + uint64_t nextPersistenceId; public: uint enqCnt; uint deqCnt; bool error; + TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {} + ~TestMessageStoreOC(){} + virtual void dequeue(TransactionContext*, const boost::intrusive_ptr<PersistableMessage>& /*msg*/, const PersistableQueue& /*queue*/) @@ -290,11 +317,12 @@ class TestMessageStoreOC : public NullMessageStore } virtual void enqueue(TransactionContext*, - const boost::intrusive_ptr<PersistableMessage>& /*msg*/, + const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& /* queue */) { if (error) throw Exception("Enqueue error test"); enqCnt++; + msg->enqueueComplete(); } void createError() @@ -302,8 +330,32 @@ class TestMessageStoreOC : public NullMessageStore error=true; } - TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {} - ~TestMessageStoreOC(){} + bool init(const Options*) { return true; } + void truncateInit(const bool) {} + void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); } + void destroy(PersistableQueue&) {} + void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); } + void destroy(const PersistableExchange&) {} + void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} + void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} + void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); } + void destroy(const PersistableConfig&) {} + void stage(const boost::intrusive_ptr<PersistableMessage>&) {} + void destroy(PersistableMessage&) {} + void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {} + void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&, + std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } + void flush(const qpid::broker::PersistableQueue&) {} + uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; } + + std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); } + std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); } + void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); } + void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } + void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } + void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); } + + void recover(RecoveryManager&) {} }; @@ -703,7 +755,7 @@ not requeued to the store. QPID_AUTO_TEST_CASE(testLastNodeJournalError){ /* -simulate store excption going into last node standing +simulate store exception going into last node standing */ TestMessageStoreOC testStore; @@ -727,16 +779,271 @@ simulate store excption going into last node standing } -intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) { - intrusive_ptr<Message> msg(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - msg->getFrames().append(method); - msg->getFrames().append(header); - msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); +intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false) +{ + intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable); + if (content.size()) MessageUtils::addContent(msg, content); + msg->setStore(&store); return msg; } +QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ + + TestMessageStoreOC testStore; + client::QueueOptions args0; // No size policy + client::QueueOptions args1; + args1.setSizePolicy(FLOW_TO_DISK, 0, 1); + client::QueueOptions args2; + args2.setSizePolicy(FLOW_TO_DISK, 0, 2); + + // --- Fanout exchange bound to single transient queue ------------------------------------------------------------- + + FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue + Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit + tq1->configure(args1); + sbtFanout1.bind(tq1, "", 0); + + intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg01(msg01); + sbtFanout1.route(dmsg01, "", 0); // Brings queue 1 to capacity limit + msg01->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg01->isContentReleased(), false); + BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); + + intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg02(msg02); + BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException); + msg02->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg02->isContentReleased(), false); + BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); + + intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // transient w/ content + DeliverableMessage dmsg03(msg03); + BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException); + msg03->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg03->isContentReleased(), false); + BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); + + intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content + DeliverableMessage dmsg04(msg04); + BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException); + msg04->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg04->isContentReleased(), false); + BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); + + intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content + DeliverableMessage dmsg05(msg05); + BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException); + msg05->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg05->isContentReleased(), false); + BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); + + // --- Fanout exchange bound to single durable queue --------------------------------------------------------------- + + FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue + Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit + dq2->configure(args1); + sbdFanout2.bind(dq2, "", 0); + + intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg06(msg06); + sbdFanout2.route(dmsg06, "", 0); // Brings queue 2 to capacity limit + msg06->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg06->isContentReleased(), false); + BOOST_CHECK_EQUAL(1u, dq2->getMessageCount()); + + intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg07(msg07); + sbdFanout2.route(dmsg07, "", 0); + msg07->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg07->isContentReleased(), true); + BOOST_CHECK_EQUAL(2u, dq2->getMessageCount()); + + intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content + DeliverableMessage dmsg08(msg08); + sbdFanout2.route(dmsg08, "", 0); + msg08->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg08->isContentReleased(), true); + BOOST_CHECK_EQUAL(3u, dq2->getMessageCount()); + + intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content + DeliverableMessage dmsg09(msg09); + sbdFanout2.route(dmsg09, "", 0); + msg09->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg09->isContentReleased(), true); + BOOST_CHECK_EQUAL(4u, dq2->getMessageCount()); + + intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content + DeliverableMessage dmsg10(msg10); + sbdFanout2.route(dmsg10, "", 0); + msg10->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg10->isContentReleased(), true); + BOOST_CHECK_EQUAL(5u, dq2->getMessageCount()); + + // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------ + + FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues + Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2 + dq3->configure(args2); + mbdFanout3.bind(dq3, "", 0); + Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1 + dq4->configure(args1); + mbdFanout3.bind(dq4, "", 0); + Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit + dq5->configure(args0); + mbdFanout3.bind(dq5, "", 0); + + intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg11(msg11); + mbdFanout3.route(dmsg11, "", 0); // Brings queues 3 and 4 to capacity limit + msg11->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg11->isContentReleased(), false); + BOOST_CHECK_EQUAL(1u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(1u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(1u, dq5->getMessageCount()); + + intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg12(msg12); + mbdFanout3.route(dmsg12, "", 0); + msg12->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg12->isContentReleased(), true); + BOOST_CHECK_EQUAL(2u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(2u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(2u, dq5->getMessageCount()); + + intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content + DeliverableMessage dmsg13(msg13); + mbdFanout3.route(dmsg13, "", 0); + msg13->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg13->isContentReleased(), true); + BOOST_CHECK_EQUAL(3u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(3u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(3u, dq5->getMessageCount()); + + intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content + DeliverableMessage dmsg14(msg14); + mbdFanout3.route(dmsg14, "", 0); + msg14->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg14->isContentReleased(), true); + BOOST_CHECK_EQUAL(4u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(4u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(4u, dq5->getMessageCount()); + + intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content + DeliverableMessage dmsg15(msg15); + mbdFanout3.route(dmsg15, "", 0); + msg15->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg15->isContentReleased(), true); + BOOST_CHECK_EQUAL(5u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(5u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(5u, dq5->getMessageCount()); + + // Bind a transient queue, this should block the release of any further messages. + // Note: this will result in a violation of the count policy of dq3 and dq4 - but this + // is expected until a better overall multi-queue design is implemented. Similarly + // for the other tests in this section. + + Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit + tq6->configure(args0); + mbdFanout3.bind(tq6, "", 0); + + intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg16(msg16); + mbdFanout3.route(dmsg16, "", 0); + msg16->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg16->isContentReleased(), false); + BOOST_CHECK_EQUAL(6u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(6u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(6u, dq5->getMessageCount()); + + intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content + DeliverableMessage dmsg17(msg17); + mbdFanout3.route(dmsg17, "", 0); + msg17->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg17->isContentReleased(), false); + BOOST_CHECK_EQUAL(7u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(7u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(7u, dq5->getMessageCount()); + + intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content + DeliverableMessage dmsg18(msg18); + mbdFanout3.route(dmsg18, "", 0); + msg18->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg18->isContentReleased(), false); + BOOST_CHECK_EQUAL(8u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(8u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(8u, dq5->getMessageCount()); + + intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content + DeliverableMessage dmsg19(msg19); + mbdFanout3.route(dmsg19, "", 0); + msg19->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg19->isContentReleased(), false); + BOOST_CHECK_EQUAL(9u, dq3->getMessageCount()); + BOOST_CHECK_EQUAL(9u, dq4->getMessageCount()); + BOOST_CHECK_EQUAL(9u, dq5->getMessageCount()); + + + // --- Fanout exchange bound to multiple durable and transient queues ---------------------------------------------- + + FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues + Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit + dq7->configure(args0); + mbmFanout4.bind(dq7, "", 0); + Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit + dq8->configure(args1); + mbmFanout4.bind(dq8, "", 0); + Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit + tq9->configure(args0); + mbmFanout4.bind(tq9, "", 0); + + intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg20(msg20); + mbmFanout4.route(dmsg20, "", 0); // Brings queue 7 to capacity limit + msg20->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg20->isContentReleased(), false); + BOOST_CHECK_EQUAL(1u, dq7->getMessageCount()); + BOOST_CHECK_EQUAL(1u, dq8->getMessageCount()); + BOOST_CHECK_EQUAL(1u, tq9->getMessageCount()); + + intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content + DeliverableMessage dmsg21(msg21); + mbmFanout4.route(dmsg21, "", 0); + msg21->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg21->isContentReleased(), false); + BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit + BOOST_CHECK_EQUAL(2u, dq8->getMessageCount()); + BOOST_CHECK_EQUAL(2u, tq9->getMessageCount()); + + intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content + DeliverableMessage dmsg22(msg22); + mbmFanout4.route(dmsg22, "", 0); + msg22->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg22->isContentReleased(), false); + BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit + BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit + BOOST_CHECK_EQUAL(3u, tq9->getMessageCount()); + + intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content + DeliverableMessage dmsg23(msg23); + mbmFanout4.route(dmsg23, "", 0); + msg23->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg23->isContentReleased(), false); + BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit + BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit + BOOST_CHECK_EQUAL(4u, tq9->getMessageCount()); + + intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content + DeliverableMessage dmsg24(msg24); + mbmFanout4.route(dmsg24, "", 0); + msg24->tryReleaseContent(); + BOOST_CHECK_EQUAL(msg24->isContentReleased(), false); + BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit + BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit + BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); +} + + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index fabb01b864..6b44d95baa 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -50,7 +50,7 @@ struct TxPublishTest TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), - msg(MessageUtils::createMessage("exchange", "routing_key", "id")), + msg(MessageUtils::createMessage("exchange", "routing_key", false, "id")), op(msg) { msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); |