summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-09-22 17:36:01 +0000
committerKim van der Riet <kpvdr@apache.org>2009-09-22 17:36:01 +0000
commitf13c131456c4fb0a027513242f2d8253aad75ca0 (patch)
tree5ccd77aeb648695b41c0444e44ffa039e012d124
parent3d3fb015b49b088a6e1f641437cd6b7acb0ed6ec (diff)
downloadqpid-python-f13c131456c4fb0a027513242f2d8253aad75ca0.tar.gz
Joint checkin from gsim, kpvdr, cctrieloff. See QPID-2102: Exceeding reject queue policy under a transaction causes broker crash
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@817742 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/DtxAck.cpp22
-rw-r--r--cpp/src/qpid/broker/Message.cpp19
-rw-r--r--cpp/src/qpid/broker/Message.h9
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp3
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp31
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h15
-rw-r--r--cpp/src/qpid/broker/Queue.cpp61
-rw-r--r--cpp/src/qpid/broker/Queue.h5
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp141
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.h30
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp3
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp8
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp54
-rw-r--r--cpp/src/qpid/broker/TxPublish.h14
-rw-r--r--cpp/src/tests/ExchangeTest.cpp2
-rw-r--r--cpp/src/tests/MessageUtils.h5
-rw-r--r--cpp/src/tests/QueuePolicyTest.cpp58
-rw-r--r--cpp/src/tests/QueueTest.cpp341
-rw-r--r--cpp/src/tests/TxPublishTest.cpp2
19 files changed, 631 insertions, 192 deletions
diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp
index b189ef4cdb..bca3f90bbe 100644
--- a/cpp/src/qpid/broker/DtxAck.cpp
+++ b/cpp/src/qpid/broker/DtxAck.cpp
@@ -48,12 +48,26 @@ bool DtxAck::prepare(TransactionContext* ctxt) throw()
void DtxAck::commit() throw()
{
- for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed));
- pending.clear();
+ try {
+ for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed));
+ pending.clear();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to commit (unknown error)");
+ }
+
}
void DtxAck::rollback() throw()
{
- for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue));
- pending.clear();
+ try {
+ for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue));
+ pending.clear();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to complete rollback: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to complete rollback (unknown error)");
+ }
+
}
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 7360010192..ae69a429c8 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -181,11 +181,23 @@ void Message::decodeContent(framing::Buffer& buffer)
loaded = true;
}
-void Message::releaseContent(MessageStore* _store)
+void Message::tryReleaseContent()
{
- if (!store) {
- store = _store;
+ if (checkContentReleasable()) {
+ releaseContent();
}
+}
+
+void Message::releaseContent(MessageStore* s)
+{
+ //deprecated, use setStore(store); releaseContent(); instead
+ if (!store) setStore(s);
+ releaseContent();
+}
+
+void Message::releaseContent()
+{
+ sys::Mutex::ScopedLock l(lock);
if (store) {
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
@@ -234,6 +246,7 @@ bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxC
void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
{
+ sys::Mutex::ScopedLock l(lock);
if (isContentReleased() && !frames.isComplete()) {
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index e4d09b1042..3845146955 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -129,12 +129,9 @@ public:
QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- void releaseContent(MessageStore* store);
+ void tryReleaseContent();
+ void releaseContent();
+ void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
void destroy();
bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const;
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 14b233fd6c..b1a2b77b05 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -80,7 +80,7 @@ void MessageBuilder::handle(AMQFrame& frame)
&& !NullMessageStore::isNullStore(store)
&& message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
{
- message->releaseContent(store);
+ message->releaseContent();
staging = true;
}
}
@@ -96,6 +96,7 @@ void MessageBuilder::end()
void MessageBuilder::start(const SequenceNumber& id)
{
message = intrusive_ptr<Message>(new Message(id));
+ message->setStore(store);
state = METHOD;
staging = false;
}
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 2ef223aa81..303a0501f4 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -36,7 +36,6 @@ PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
asyncEnqueueCounter(0),
asyncDequeueCounter(0),
- contentReleased(false),
store(0)
{}
@@ -59,9 +58,15 @@ void PersistableMessage::flush()
}
}
-void PersistableMessage::setContentReleased() {contentReleased = true; }
+void PersistableMessage::setContentReleased()
+{
+ contentReleaseState.released = true;
+}
-bool PersistableMessage::isContentReleased()const { return contentReleased; }
+bool PersistableMessage::isContentReleased() const
+{
+ return contentReleaseState.released;
+}
bool PersistableMessage::isEnqueueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
@@ -153,6 +158,26 @@ void PersistableMessage::dequeueAsync() {
asyncDequeueCounter++;
}
+PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
+
+void PersistableMessage::setStore(MessageStore* s)
+{
+ store = s;
+}
+
+void PersistableMessage::requestContentRelease()
+{
+ contentReleaseState.requested = true;
+}
+void PersistableMessage::blockContentRelease()
+{
+ contentReleaseState.blocked = true;
+}
+bool PersistableMessage::checkContentReleasable()
+{
+ return contentReleaseState.requested && !contentReleaseState.blocked;
+}
+
}}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 0274b41375..a968480249 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -68,8 +68,16 @@ class PersistableMessage : public Persistable
void enqueueAsync();
void dequeueAsync();
- bool contentReleased;
syncList synclist;
+ struct ContentReleaseState
+ {
+ bool blocked;
+ bool requested;
+ bool released;
+
+ ContentReleaseState();
+ };
+ ContentReleaseState contentReleaseState;
protected:
/** Called when all enqueues are complete for this message. */
@@ -97,6 +105,11 @@ class PersistableMessage : public Persistable
void flush();
bool isContentReleased() const;
+
+ void setStore(MessageStore*);
+ void requestContentRelease();
+ void blockContentRelease();
+ bool checkContentReleasable();
QPID_BROKER_EXTERN bool isEnqueueComplete();
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b2a8e223c5..1cc48a949e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -181,6 +181,8 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
void Queue::recover(boost::intrusive_ptr<Message>& msg){
+ if (policy.get()) policy->recoverEnqueued(msg);
+
push(msg, true);
if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
@@ -563,15 +565,12 @@ void Queue::popMsg(QueuedMessage& qmsg)
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
- Messages dequeues;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (policy.get()) {
- policy->tryEnqueue(qm);
- //depending on policy, may have some dequeues
- if (!isRecovery) pendingDequeues.swap(dequeues);
+ policy->enqueued(qm);
}
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
@@ -608,10 +607,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
copy.notify();
- if (!dequeues.empty()) {
- //depending on policy, may have some dequeues
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
- }
}
QueuedMessage Queue::getFront()
@@ -697,8 +692,12 @@ void Queue::setLastNodeFailure()
}
// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
+ if (policy.get() && !suppressPolicyCheck) {
+ policy->tryEnqueue(msg);
+ }
+
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
@@ -713,9 +712,21 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
store->enqueue(ctxt, pmsg, *this);
return true;
}
+ if (!store) {
+ //Messages enqueued on a transient queue should be prevented
+ //from having their content released as it may not be
+ //recoverable by these queue for delivery
+ msg->blockContentRelease();
+ }
return false;
}
+void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (policy.get()) policy->enqueueAborted(msg);
+}
+
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
@@ -781,7 +792,15 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings, bool recovering)
{
- setPolicy(QueuePolicy::createQueuePolicy(_settings));
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ (!store || NullMessageStore::isNullStore(store))) {
+ QPID_LOG(warning, "Flow to disk not valid for non-persisted queue");
+ FieldTable copy(_settings);
+ copy.erase(QueuePolicy::typeKey);
+ setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
+ } else {
+ setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
+ }
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
@@ -975,19 +994,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
-bool Queue::releaseMessageContent(const QueuedMessage& m)
-{
- if (store && !NullMessageStore::isNullStore(store)) {
- QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
- m.payload->releaseContent(store);
- return true;
- } else {
- QPID_LOG(warning, "Message " << m.position << " on " << name
- << " cannot be released from memory as the queue is not durable");
- return false;
- }
-}
-
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
@@ -1044,11 +1050,12 @@ void Queue::insertSequenceNumbers(const std::string& key)
void Queue::enqueued(const QueuedMessage& m)
{
if (m.payload) {
- if (policy.get()) policy->tryEnqueue(m);
- mgntEnqStats(m.payload);
- if (m.payload->isPersistent()) {
- enqueue ( 0, m.payload );
+ if (policy.get()) {
+ policy->recoverEnqueued(m.payload);
+ policy->enqueued(m);
}
+ mgntEnqStats(m.payload);
+ enqueue ( 0, m.payload, true );
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 77799fd967..9ac5e3f5e9 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -239,7 +239,8 @@ namespace qpid {
QPID_BROKER_EXTERN void setLastNodeFailure();
QPID_BROKER_EXTERN void clearLastNodeFailure();
- bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+ bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck = false);
+ void enqueueAborted(boost::intrusive_ptr<Message> msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
@@ -315,8 +316,6 @@ namespace qpid {
bindings.eachBinding(f);
}
- bool releaseMessageContent(const QueuedMessage&);
-
void popMsg(QueuedMessage& qmsg);
/** Set the position sequence number for the next message on the queue.
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 39afe90134..0f1f7f370f 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -28,8 +28,8 @@
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
+QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {}
void QueuePolicy::enqueued(uint64_t _size)
{
@@ -39,18 +39,15 @@ void QueuePolicy::enqueued(uint64_t _size)
void QueuePolicy::dequeued(uint64_t _size)
{
- //Note: underflow detection is not reliable in the face of
- //concurrent updates (at present locking in Queue.cpp prevents
- //these anyway); updates are atomic and are safe regardless.
if (maxCount) {
- if (count.get() > 0) {
+ if (count > 0) {
--count;
} else {
throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this));
}
}
if (maxSize) {
- if (_size > size.get()) {
+ if (_size > size) {
throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this));
} else {
size -= _size;
@@ -58,49 +55,53 @@ void QueuePolicy::dequeued(uint64_t _size)
}
}
-bool QueuePolicy::checkLimit(const QueuedMessage& m)
+bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize;
- bool countExceeded = maxCount && (count.get() + 1) > maxCount;
+ bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize;
+ bool countExceeded = maxCount && (count + 1) > maxCount;
bool exceeded = sizeExceeded || countExceeded;
if (exceeded) {
if (!policyExceeded) {
- policyExceeded = true;
- if (m.queue) {
- if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName());
- if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName());
- }
+ policyExceeded = true;
+ if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name);
+ if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name);
}
} else {
if (policyExceeded) {
policyExceeded = false;
- if (m.queue) {
- QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName());
- }
+ QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name);
}
}
return !exceeded;
}
-void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
{
+ qpid::sys::Mutex::ScopedLock l(lock);
if (checkLimit(m)) {
- enqueued(m);
+ enqueued(m->contentSize());
} else {
- std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
- throw ResourceLimitExceededException(
- QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
- << " of size " << m.payload->contentSize() << " , policy: " << *this));
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this));
}
}
-void QueuePolicy::enqueued(const QueuedMessage& m)
+void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ enqueued(m->contentSize());
+}
+
+void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
{
- enqueued(m.payload->contentSize());
+ qpid::sys::Mutex::ScopedLock l(lock);
+ dequeued(m->contentSize());
}
+void QueuePolicy::enqueued(const QueuedMessage&) {}
+
void QueuePolicy::dequeued(const QueuedMessage& m)
{
+ qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m.payload->contentSize());
}
@@ -132,7 +133,7 @@ std::string QueuePolicy::getType(const FieldTable& settings)
std::transform(t.begin(), t.end(), t.begin(), tolower);
if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
}
- return FLOW_TO_DISK;
+ return REJECT;
}
void QueuePolicy::setDefaultMaxSize(uint64_t s)
@@ -148,8 +149,8 @@ void QueuePolicy::encode(Buffer& buffer) const
{
buffer.putLong(maxCount);
buffer.putLongLong(maxSize);
- buffer.putLong(count.get());
- buffer.putLongLong(size.get());
+ buffer.putLong(count);
+ buffer.putLongLong(size);
}
void QueuePolicy::decode ( Buffer& buffer )
@@ -179,16 +180,18 @@ const std::string QueuePolicy::RING("ring");
const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
-FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
- QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) :
+ QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {}
-bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+ if (!QueuePolicy::checkLimit(m)) m->requestContentRelease();
+ return true;
}
-RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+RingQueuePolicy::RingQueuePolicy(const std::string& _name,
+ uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
@@ -197,7 +200,6 @@ bool before(const QueuedMessage& a, const QueuedMessage& b)
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
- QueuePolicy::enqueued(m);
qpid::sys::Mutex::ScopedLock l(lock);
//need to insert in correct location based on position
queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
@@ -222,42 +224,28 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
return find(m, pendingDequeues, false) || find(m, queue, false);
}
-bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
QueuedMessage oldest;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- if (queue.empty()) {
- QPID_LOG(debug, "Message too large for ring queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << " [" << *this << "] "
- << ": message size = " << m.payload->contentSize() << " bytes");
- return false;
- }
- oldest = queue.front();
+ if (queue.empty()) {
+ QPID_LOG(debug, "Message too large for ring queue " << name
+ << " [" << *this << "] "
+ << ": message size = " << m->contentSize() << " bytes");
+ return false;
}
+ oldest = queue.front();
if (oldest.queue->acquire(oldest) || !strict) {
- {
- //TODO: fix this! In the current code, this method is
- //only ever called with the Queue lock already taken. This
- //should not be relied upon going forward however and
- //clearly the locking in this class is insufficient as
- //there is no guarantee that the message previously atthe
- //front is still there.
- qpid::sys::Mutex::ScopedLock l(lock);
- queue.pop_front();
- pendingDequeues.push_back(oldest);
- }
- oldest.queue->addPendingDequeue(oldest);
- QPID_LOG(debug, "Ring policy triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << ": removed message " << oldest.position << " to make way for " << m.position);
+ queue.pop_front();
+ pendingDequeues.push_back(oldest);
+ QPID_LOG(debug, "Ring policy triggered in " << name
+ << ": removed message " << oldest.position << " to make way for new message");
+ qpid::sys::Mutex::ScopedUnlock u(lock);
+ oldest.queue->dequeue(0, oldest);
return true;
} else {
- QPID_LOG(debug, "Ring policy could not be triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ QPID_LOG(debug, "Ring policy could not be triggered in " << name
<< ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
//in strict mode, if oldest message has been delivered (hence
//cannot be acquired) but not yet acked, it should not be
@@ -277,25 +265,36 @@ bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
return false;
}
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+ return createQueuePolicy("<unspecified>", maxCount, maxSize, type);
+}
+
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
{
+ return createQueuePolicy("<unspecified>", settings);
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
+{
uint32_t maxCount = getInt(settings, maxCountKey, 0);
uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
if (maxCount || maxSize) {
- return createQueuePolicy(maxCount, maxSize, getType(settings));
+ return createQueuePolicy(name, maxCount, maxSize, getType(settings));
} else {
return std::auto_ptr<QueuePolicy>();
}
}
-std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name,
+ uint32_t maxCount, uint64_t maxSize, const std::string& type)
{
if (type == RING || type == RING_STRICT) {
- return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type));
} else if (type == FLOW_TO_DISK) {
- return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize));
} else {
- return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type));
}
}
@@ -305,10 +304,10 @@ namespace qpid {
std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
{
- if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
else out << "size: unlimited";
out << "; ";
- if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
else out << "count: unlimited";
out << "; type=" << p.type;
return out;
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h
index 54745876d5..65c52304f2 100644
--- a/cpp/src/qpid/broker/QueuePolicy.h
+++ b/cpp/src/qpid/broker/QueuePolicy.h
@@ -40,12 +40,11 @@ class QueuePolicy
uint32_t maxCount;
uint64_t maxSize;
const std::string type;
- qpid::sys::AtomicValue<uint32_t> count;
- qpid::sys::AtomicValue<uint64_t> size;
+ uint32_t count;
+ uint64_t size;
bool policyExceeded;
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
- static std::string getType(const qpid::framing::FieldTable& settings);
public:
static QPID_BROKER_EXTERN const std::string maxCountKey;
@@ -57,10 +56,12 @@ class QueuePolicy
static QPID_BROKER_EXTERN const std::string RING_STRICT;
virtual ~QueuePolicy() {}
- QPID_BROKER_EXTERN void tryEnqueue(const QueuedMessage&);
+ QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
+ virtual void enqueued(const QueuedMessage&);
virtual void dequeued(const QueuedMessage&);
virtual bool isEnqueued(const QueuedMessage&);
- virtual bool checkLimit(const QueuedMessage&);
QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
uint32_t getMaxCount() const { return maxCount; }
uint64_t getMaxSize() const { return maxSize; }
@@ -69,15 +70,21 @@ class QueuePolicy
uint32_t encodedSize() const;
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static std::string getType(const qpid::framing::FieldTable& settings);
static void setDefaultMaxSize(uint64_t);
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&,
const QueuePolicy&);
protected:
- QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ const std::string name;
+ qpid::sys::Mutex lock;
- virtual void enqueued(const QueuedMessage&);
+ QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+ virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
void enqueued(uint64_t size);
void dequeued(uint64_t size);
};
@@ -86,21 +93,20 @@ class QueuePolicy
class FlowToDiskPolicy : public QueuePolicy
{
public:
- FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
- bool checkLimit(const QueuedMessage&);
+ FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
};
class RingQueuePolicy : public QueuePolicy
{
public:
- RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+ RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
bool isEnqueued(const QueuedMessage&);
- bool checkLimit(const QueuedMessage&);
+ bool checkLimit(boost::intrusive_ptr<Message> msg);
private:
typedef std::deque<QueuedMessage> Messages;
- qpid::sys::Mutex lock;
Messages pendingDequeues;
Messages queue;
const bool strict;
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index bdd5f33601..a59d29c3cc 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -356,6 +356,9 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
} else {
DeliverableMessage deliverable(msg);
route(msg, deliverable);
+ if (msg->checkContentReleasable()) {
+ msg->releaseContent();
+ }
}
}
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index e47ac84990..928ac12c10 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -88,7 +88,13 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw()
void TxAccept::commit() throw()
{
- ops.commit();
+ try {
+ ops.commit();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to commit (unknown error)");
+ }
}
void TxAccept::rollback() throw() {}
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 17b99fd883..4b083033ea 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -26,9 +26,14 @@ using namespace qpid::broker;
TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
-bool TxPublish::prepare(TransactionContext* ctxt) throw(){
+bool TxPublish::prepare(TransactionContext* ctxt) throw()
+{
try{
- for_each(queues.begin(), queues.end(), Prepare(ctxt, msg));
+ while (!queues.empty()) {
+ prepare(ctxt, queues.front());
+ prepared.push_back(queues.front());
+ queues.pop_front();
+ }
return true;
}catch(const std::exception& e){
QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -38,11 +43,30 @@ bool TxPublish::prepare(TransactionContext* ctxt) throw(){
return false;
}
-void TxPublish::commit() throw(){
- for_each(queues.begin(), queues.end(), Commit(msg));
+void TxPublish::commit() throw()
+{
+ try {
+ for_each(prepared.begin(), prepared.end(), Commit(msg));
+ if (msg->checkContentReleasable()) {
+ msg->releaseContent();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to commit: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to commit (unknown error)");
+ }
}
-void TxPublish::rollback() throw(){
+void TxPublish::rollback() throw()
+{
+ try {
+ for_each(prepared.begin(), prepared.end(), Rollback(msg));
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to complete rollback: " << e.what());
+ } catch(...) {
+ QPID_LOG(error, "Failed to complete rollback (unknown error)");
+ }
+
}
void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
@@ -54,16 +78,14 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
}
}
-TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg)
- : ctxt(_ctxt), msg(_msg){}
-
-void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){
+void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
+{
if (!queue->enqueue(ctxt, msg)){
/**
- * if not store then mark message for ack and deleivery once
- * commit happens, as async IO will never set it when no store
- * exists
- */
+ * if not store then mark message for ack and deleivery once
+ * commit happens, as async IO will never set it when no store
+ * exists
+ */
msg->enqueueComplete();
}
}
@@ -74,6 +96,12 @@ void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){
queue->process(msg);
}
+TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){}
+
+void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){
+ queue->enqueueAborted(msg);
+}
+
uint64_t TxPublish::contentSize ()
{
return msg->contentSize ();
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index d5cf5639c4..b6ab9767ab 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -47,23 +47,25 @@ namespace qpid {
* dispatch or to be added to the in-memory queue.
*/
class TxPublish : public TxOp, public Deliverable{
- class Prepare{
- TransactionContext* ctxt;
+
+ class Commit{
boost::intrusive_ptr<Message>& msg;
public:
- Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg);
+ Commit(boost::intrusive_ptr<Message>& msg);
void operator()(const boost::shared_ptr<Queue>& queue);
};
-
- class Commit{
+ class Rollback{
boost::intrusive_ptr<Message>& msg;
public:
- Commit(boost::intrusive_ptr<Message>& msg);
+ Rollback(boost::intrusive_ptr<Message>& msg);
void operator()(const boost::shared_ptr<Queue>& queue);
};
boost::intrusive_ptr<Message> msg;
std::list<Queue::shared_ptr> queues;
+ std::list<Queue::shared_ptr> prepared;
+
+ void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
public:
QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 44835c6184..88a1cd99c2 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -60,7 +60,7 @@ QPID_AUTO_TEST_CASE(testMe)
queue.reset();
queue2.reset();
- intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "key", "id"));
+ intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "key", false, "id"));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h
index dae74cce7d..a1b140d484 100644
--- a/cpp/src/tests/MessageUtils.h
+++ b/cpp/src/tests/MessageUtils.h
@@ -34,7 +34,8 @@ namespace tests {
struct MessageUtils
{
static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="",
- const Uuid& messageId=Uuid(true), uint64_t contentSize = 0)
+ const bool durable = false, const Uuid& messageId=Uuid(true),
+ uint64_t contentSize = 0)
{
boost::intrusive_ptr<broker::Message> msg(new broker::Message());
@@ -47,6 +48,8 @@ struct MessageUtils
props->setContentLength(contentSize);
props->setMessageId(messageId);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ if (durable)
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
return msg;
}
diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp
index f40d30b588..875976db85 100644
--- a/cpp/src/tests/QueuePolicyTest.cpp
+++ b/cpp/src/tests/QueuePolicyTest.cpp
@@ -48,56 +48,56 @@ QueuedMessage createMessage(uint32_t size)
QPID_AUTO_TEST_CASE(testCount)
{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 0));
+ std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0));
BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
QueuedMessage msg = createMessage(10);
for (size_t i = 0; i < 5; i++) {
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
}
try {
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
BOOST_FAIL("Policy did not fail on enqueuing sixth message");
} catch (const ResourceLimitExceededException&) {}
policy->dequeued(msg);
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
try {
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)");
} catch (const ResourceLimitExceededException&) {}
}
QPID_AUTO_TEST_CASE(testSize)
{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(0, 50));
+ std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50));
QueuedMessage msg = createMessage(10);
for (size_t i = 0; i < 5; i++) {
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
}
try {
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
} catch (const ResourceLimitExceededException&) {}
policy->dequeued(msg);
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
try {
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy);
} catch (const ResourceLimitExceededException&) {}
}
QPID_AUTO_TEST_CASE(testBoth)
{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 50));
+ std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50));
try {
QueuedMessage msg = createMessage(51);
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy);
} catch (const ResourceLimitExceededException&) {}
@@ -108,17 +108,17 @@ QPID_AUTO_TEST_CASE(testBoth)
messages.push_back(createMessage(2));
messages.push_back(createMessage(7));
for (size_t i = 0; i < messages.size(); i++) {
- policy->tryEnqueue(messages[i]);
+ policy->tryEnqueue(messages[i].payload);
}
//size = 45 at this point, count = 5
try {
QueuedMessage msg = createMessage(5);
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy);
} catch (const ResourceLimitExceededException&) {}
try {
QueuedMessage msg = createMessage(10);
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
} catch (const ResourceLimitExceededException&) {}
@@ -126,7 +126,7 @@ QPID_AUTO_TEST_CASE(testBoth)
policy->dequeued(messages[0]);
try {
QueuedMessage msg = createMessage(20);
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg.payload);
} catch (const ResourceLimitExceededException&) {
BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy);
}
@@ -135,10 +135,10 @@ QPID_AUTO_TEST_CASE(testBoth)
QPID_AUTO_TEST_CASE(testSettings)
{
//test reading and writing the policy from/to field table
- std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy(101, 303));
+ std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303));
FieldTable settings;
a->update(settings);
- std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy(settings));
+ std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings));
BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount());
BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
}
@@ -146,7 +146,7 @@ QPID_AUTO_TEST_CASE(testSettings)
QPID_AUTO_TEST_CASE(testRingPolicy)
{
FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::RING);
+ std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
policy->update(args);
ProxySessionFixture f;
@@ -175,7 +175,7 @@ QPID_AUTO_TEST_CASE(testRingPolicy)
QPID_AUTO_TEST_CASE(testStrictRingPolicy)
{
FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::RING_STRICT);
+ std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
policy->update(args);
ProxySessionFixture f;
@@ -201,7 +201,7 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy)
QPID_AUTO_TEST_CASE(testPolicyWithDtx)
{
FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::REJECT);
+ std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
policy->update(args);
ProxySessionFixture f;
@@ -282,6 +282,22 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore)
} catch (const ResourceLimitExceededException&) {}
}
+QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit)
+{
+ FieldTable args;
+ std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
+ policy->update(args);
+
+ ProxySessionFixture f;
+ std::string q("q");
+ f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
+ f.session.txSelect();
+ for (int i = 0; i < 10; i++) {
+ f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ ScopedSuppressLogging sl; // Suppress messages for expected errors.
+ BOOST_CHECK_THROW(f.session.txCommit(), InternalErrorException);
+}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 841a19f7c1..756f4ac3f6 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -18,10 +18,13 @@
* under the License.
*
*/
+#include "MessageUtils.h"
#include "unit_test.h"
#include "test_tools.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -30,12 +33,16 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include <iostream>
#include "boost/format.hpp"
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
+using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -61,13 +68,14 @@ public:
class FailOnDeliver : public Deliverable
{
- Message msg;
+ boost::intrusive_ptr<Message> msg;
public:
+ FailOnDeliver() : msg(MessageUtils::createMessage()) {}
void deliverTo(const boost::shared_ptr<Queue>& queue)
{
throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
}
- Message& getMessage() { return msg; }
+ Message& getMessage() { return *(msg.get()); }
};
intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) {
@@ -210,8 +218,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
}
-QPID_AUTO_TEST_CASE(testBound)
-{
+QPID_AUTO_TEST_CASE(testBound){
//test the recording of bindings, and use of those to allow a queue to be unbound
string key("my-key");
FieldTable args;
@@ -245,7 +252,6 @@ QPID_AUTO_TEST_CASE(testBound)
}
QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
-
client::QueueOptions args;
args.setPersistLastNode();
@@ -273,14 +279,35 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
}
-class TestMessageStoreOC : public NullMessageStore
+const std::string nullxid = "";
+
+class SimpleDummyCtxt : public TransactionContext {};
+
+class DummyCtxt : public TPCTransactionContext
+{
+ const std::string xid;
+ public:
+ DummyCtxt(const std::string& _xid) : xid(_xid) {}
+ static std::string getXid(TransactionContext& ctxt)
+ {
+ DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
+ return c ? c->xid : nullxid;
+ }
+};
+
+class TestMessageStoreOC : public MessageStore
{
+ std::set<std::string> prepared;
+ uint64_t nextPersistenceId;
public:
uint enqCnt;
uint deqCnt;
bool error;
+ TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
+ ~TestMessageStoreOC(){}
+
virtual void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /*queue*/)
@@ -290,11 +317,12 @@ class TestMessageStoreOC : public NullMessageStore
}
virtual void enqueue(TransactionContext*,
- const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& /* queue */)
{
if (error) throw Exception("Enqueue error test");
enqCnt++;
+ msg->enqueueComplete();
}
void createError()
@@ -302,8 +330,32 @@ class TestMessageStoreOC : public NullMessageStore
error=true;
}
- TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
- ~TestMessageStoreOC(){}
+ bool init(const Options*) { return true; }
+ void truncateInit(const bool) {}
+ void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
+ void destroy(PersistableQueue&) {}
+ void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableExchange&) {}
+ void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableConfig&) {}
+ void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
+ void destroy(PersistableMessage&) {}
+ void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
+ void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
+ std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
+ void flush(const qpid::broker::PersistableQueue&) {}
+ uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
+
+ std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
+ std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
+ void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
+ void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
+
+ void recover(RecoveryManager&) {}
};
@@ -703,7 +755,7 @@ not requeued to the store.
QPID_AUTO_TEST_CASE(testLastNodeJournalError){
/*
-simulate store excption going into last node standing
+simulate store exception going into last node standing
*/
TestMessageStoreOC testStore;
@@ -727,16 +779,271 @@ simulate store excption going into last node standing
}
-intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) {
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false)
+{
+ intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable);
+ if (content.size()) MessageUtils::addContent(msg, content);
+ msg->setStore(&store);
return msg;
}
+QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
+
+ TestMessageStoreOC testStore;
+ client::QueueOptions args0; // No size policy
+ client::QueueOptions args1;
+ args1.setSizePolicy(FLOW_TO_DISK, 0, 1);
+ client::QueueOptions args2;
+ args2.setSizePolicy(FLOW_TO_DISK, 0, 2);
+
+ // --- Fanout exchange bound to single transient queue -------------------------------------------------------------
+
+ FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue
+ Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit
+ tq1->configure(args1);
+ sbtFanout1.bind(tq1, "", 0);
+
+ intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg01(msg01);
+ sbtFanout1.route(dmsg01, "", 0); // Brings queue 1 to capacity limit
+ msg01->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg01->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg02(msg02);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ msg02->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // transient w/ content
+ DeliverableMessage dmsg03(msg03);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ msg03->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg04(msg04);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ msg04->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg05(msg05);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ msg05->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
+
+ // --- Fanout exchange bound to single durable queue ---------------------------------------------------------------
+
+ FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue
+ Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit
+ dq2->configure(args1);
+ sbdFanout2.bind(dq2, "", 0);
+
+ intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg06(msg06);
+ sbdFanout2.route(dmsg06, "", 0); // Brings queue 2 to capacity limit
+ msg06->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg06->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, dq2->getMessageCount());
+
+ intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg07(msg07);
+ sbdFanout2.route(dmsg07, "", 0);
+ msg07->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg07->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(2u, dq2->getMessageCount());
+
+ intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg08(msg08);
+ sbdFanout2.route(dmsg08, "", 0);
+ msg08->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg08->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(3u, dq2->getMessageCount());
+
+ intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg09(msg09);
+ sbdFanout2.route(dmsg09, "", 0);
+ msg09->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg09->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(4u, dq2->getMessageCount());
+
+ intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg10(msg10);
+ sbdFanout2.route(dmsg10, "", 0);
+ msg10->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg10->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(5u, dq2->getMessageCount());
+
+ // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------
+
+ FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues
+ Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2
+ dq3->configure(args2);
+ mbdFanout3.bind(dq3, "", 0);
+ Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1
+ dq4->configure(args1);
+ mbdFanout3.bind(dq4, "", 0);
+ Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit
+ dq5->configure(args0);
+ mbdFanout3.bind(dq5, "", 0);
+
+ intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg11(msg11);
+ mbdFanout3.route(dmsg11, "", 0); // Brings queues 3 and 4 to capacity limit
+ msg11->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg11->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(1u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(1u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg12(msg12);
+ mbdFanout3.route(dmsg12, "", 0);
+ msg12->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg12->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(2u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(2u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(2u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg13(msg13);
+ mbdFanout3.route(dmsg13, "", 0);
+ msg13->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg13->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(3u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(3u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(3u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg14(msg14);
+ mbdFanout3.route(dmsg14, "", 0);
+ msg14->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg14->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(4u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(4u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(4u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg15(msg15);
+ mbdFanout3.route(dmsg15, "", 0);
+ msg15->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg15->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(5u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(5u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(5u, dq5->getMessageCount());
+
+ // Bind a transient queue, this should block the release of any further messages.
+ // Note: this will result in a violation of the count policy of dq3 and dq4 - but this
+ // is expected until a better overall multi-queue design is implemented. Similarly
+ // for the other tests in this section.
+
+ Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit
+ tq6->configure(args0);
+ mbdFanout3.bind(tq6, "", 0);
+
+ intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg16(msg16);
+ mbdFanout3.route(dmsg16, "", 0);
+ msg16->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg16->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(6u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(6u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(6u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg17(msg17);
+ mbdFanout3.route(dmsg17, "", 0);
+ msg17->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg17->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(7u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(7u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(7u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg18(msg18);
+ mbdFanout3.route(dmsg18, "", 0);
+ msg18->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg18->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(8u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(8u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(8u, dq5->getMessageCount());
+
+ intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg19(msg19);
+ mbdFanout3.route(dmsg19, "", 0);
+ msg19->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg19->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(9u, dq3->getMessageCount());
+ BOOST_CHECK_EQUAL(9u, dq4->getMessageCount());
+ BOOST_CHECK_EQUAL(9u, dq5->getMessageCount());
+
+
+ // --- Fanout exchange bound to multiple durable and transient queues ----------------------------------------------
+
+ FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues
+ Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit
+ dq7->configure(args0);
+ mbmFanout4.bind(dq7, "", 0);
+ Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit
+ dq8->configure(args1);
+ mbmFanout4.bind(dq8, "", 0);
+ Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit
+ tq9->configure(args0);
+ mbmFanout4.bind(tq9, "", 0);
+
+ intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg20(msg20);
+ mbmFanout4.route(dmsg20, "", 0); // Brings queue 7 to capacity limit
+ msg20->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg20->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(1u, dq7->getMessageCount());
+ BOOST_CHECK_EQUAL(1u, dq8->getMessageCount());
+ BOOST_CHECK_EQUAL(1u, tq9->getMessageCount());
+
+ intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
+ DeliverableMessage dmsg21(msg21);
+ mbmFanout4.route(dmsg21, "", 0);
+ msg21->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg21->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(2u, dq8->getMessageCount());
+ BOOST_CHECK_EQUAL(2u, tq9->getMessageCount());
+
+ intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
+ DeliverableMessage dmsg22(msg22);
+ mbmFanout4.route(dmsg22, "", 0);
+ msg22->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg22->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(3u, tq9->getMessageCount());
+
+ intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content
+ DeliverableMessage dmsg23(msg23);
+ mbmFanout4.route(dmsg23, "", 0);
+ msg23->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg23->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(4u, tq9->getMessageCount());
+
+ intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content
+ DeliverableMessage dmsg24(msg24);
+ mbmFanout4.route(dmsg24, "", 0);
+ msg24->tryReleaseContent();
+ BOOST_CHECK_EQUAL(msg24->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit
+ BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
+}
+
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index fabb01b864..6b44d95baa 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -50,7 +50,7 @@ struct TxPublishTest
TxPublishTest() :
queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(MessageUtils::createMessage("exchange", "routing_key", "id")),
+ msg(MessageUtils::createMessage("exchange", "routing_key", false, "id")),
op(msg)
{
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);