diff options
author | Gordon Sim <gsim@apache.org> | 2008-11-27 12:21:04 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-11-27 12:21:04 +0000 |
commit | 6b67c025160e2363052820b9f686bbf386d9c96a (patch) | |
tree | ca52122fc4ea53e6a93e2653f33c36f4a97d756a /cpp/src | |
parent | df072598b37b31e9a7cf72818c9aa87b2ee21f70 (diff) | |
download | qpid-python-6b67c025160e2363052820b9f686bbf386d9c96a.tar.gz |
* QPID-1488: test that policy pointer is set
* don't flow to disk for null store implementation
* add checks for undeflow in queue policy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@721166 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/IncompleteMessageList.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/tests/MessageBuilderTest.cpp | 7 |
8 files changed, 58 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp index 26c7a83d2f..2077e633ec 100644 --- a/cpp/src/qpid/broker/IncompleteMessageList.cpp +++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp @@ -58,7 +58,7 @@ void IncompleteMessageList::process(const CompletionListener& listen, bool sync) msg->flush(); // Can re-enter IncompleteMessageList::enqueueComplete } while (!msg->isEnqueueComplete()) - lock.wait(); + lock.wait(); } else { //leave the message as incomplete for now return; diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index eda71ed3da..8f0e3344d5 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -22,6 +22,7 @@ #include "Message.h" #include "MessageStore.h" +#include "NullMessageStore.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/reply_exceptions.h" @@ -72,7 +73,11 @@ void MessageBuilder::handle(AMQFrame& frame) } else { message->getFrames().append(frame); //have we reached the staging limit? if so stage message and release content - if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) { + if (state == CONTENT + && stagingThreshold + && message->getFrames().getContentSize() >= stagingThreshold + && !NullMessageStore::isNullStore(store)) + { message->releaseContent(store); staging = true; } diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 689ed15707..5a4b23217c 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -140,5 +140,15 @@ void NullMessageStore::collectPreparedXids(std::set<string>& out) out.insert(prepared.begin(), prepared.end()); } +bool NullMessageStore::isNull() const +{ + return true; +} + +bool NullMessageStore::isNullStore(const MessageStore* store) +{ + const NullMessageStore* test = dynamic_cast<const NullMessageStore*>(store); + return test && test->isNull(); +} }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index f384253de6..d99c751d26 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -76,6 +76,9 @@ class NullMessageStore : public MessageStore virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue); virtual void flush(const qpid::broker::PersistableQueue& queue); ~NullMessageStore(){} + + virtual bool isNull() const; + static bool isNullStore(const MessageStore*); }; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b1f9163bb5..a78744598c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -24,6 +24,7 @@ #include "Exchange.h" #include "DeliverableMessage.h" #include "MessageStore.h" +#include "NullMessageStore.h" #include "QueueRegistry.h" #include "qpid/StringUtils.h" @@ -741,12 +742,15 @@ void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(settings); - buffer.put(*policy); + if (policy.get()) { + buffer.put(*policy); + } } uint32_t Queue::encodedSize() const { - return name.size() + 1/*short string size octet*/ + settings.encodedSize() + (*policy).encodedSize(); + return name.size() + 1/*short string size octet*/ + settings.encodedSize() + + (policy.get() ? (*policy).encodedSize() : 0); } Queue::shared_ptr Queue::decode(QueueRegistry& queues, Buffer& buffer) @@ -756,7 +760,9 @@ Queue::shared_ptr Queue::decode(QueueRegistry& queues, Buffer& buffer) std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); buffer.get(result.first->settings); result.first->configure(result.first->settings); - buffer.get ( *(result.first->policy) ); + if (result.first->policy.get()) { + buffer.get ( *(result.first->policy) ); + } return result.first; } @@ -828,7 +834,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { bool Queue::releaseMessageContent(const QueuedMessage& m) { - if (store) { + if (store && !NullMessageStore::isNullStore(store)) { QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); m.payload->releaseContent(store); return true; diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 5c945d2c7f..41a6709d27 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -20,6 +20,7 @@ */ #include "QueuePolicy.h" #include "Queue.h" +#include "qpid/Exception.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" @@ -38,8 +39,23 @@ void QueuePolicy::enqueued(uint64_t _size) void QueuePolicy::dequeued(uint64_t _size) { - if (maxCount) --count; - if (maxSize) size -= _size; + //Note: underflow detection is not reliable in the face of + //concurrent updates (at present locking in Queue.cpp prevents + //these anyway); updates are atomic and are safe regardless. + if (maxCount) { + if (count.get() > 0) { + --count; + } else { + throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this)); + } + } + if (maxSize) { + if (_size > size.get()) { + throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this)); + } else { + size -= _size; + } + } } bool QueuePolicy::checkLimit(const QueuedMessage& m) diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 73dfc8cde8..d9896b388b 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -387,8 +387,9 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); if (!strategy.delivered) { - //TODO:if reject-unroutable, then reject - //else route to alternate exchange + //TODO:if discard-unroutable, just drop it + //TODO:else if accept-mode is explicit, reject it + //else route it to alternate exchange if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); } diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index 63c3a800de..313a91c053 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -82,6 +82,13 @@ class MockMessageStore : public NullMessageStore { return ops.empty(); } + + //don't treat this store as a null impl + bool isNull() const + { + return false; + } + }; QPID_AUTO_TEST_SUITE(MessageBuilderTestSuite) |