diff options
author | Alan Conway <aconway@apache.org> | 2012-02-17 14:17:39 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-17 14:17:39 +0000 |
commit | ad62fb6af2adf01d4815bf2612c1a605ecd2d200 (patch) | |
tree | 5830eef86e0d01f2214ff6f8965415a92b990e27 | |
parent | 6677842dee21d086dc030f5839fbad98bad01de2 (diff) | |
download | qpid-python-ad62fb6af2adf01d4815bf2612c1a605ecd2d200.tar.gz |
QPID-3603: Only ReplicatingSubscriptions can browse acquired messages.
Previously any subscription would browse acquired messages, which is
unexpected behavior.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245558 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/FifoDistributor.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 5 |
5 files changed, 16 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index b3d6f23732..682c75ed4f 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -64,6 +64,11 @@ class Consumer virtual OwnershipToken* getSession() = 0; virtual void cancel() = 0; + /** Returns true if the browser wants acquired as well as + * available messages. + */ + virtual bool browseAcquired() const { return false; }; + /** Called when the peer has acknowledged receipt of the message. * Not to be confused with accept() above, which is asking if * this consumer will consume/browse the message. diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp index 074c2b9a9d..c9ba894297 100644 --- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp +++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp @@ -42,7 +42,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& ) bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - return messages.browse(c->getPosition(), next, false); + return messages.browse(c->getPosition(), next, !c->browseAcquired()); } void FifoDistributor::query(qpid::types::Variant::Map&) const diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index e8571cf871..af6180305d 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -287,6 +287,7 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { re void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } +bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); } OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index fa2093ac61..e311f9505a 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -86,6 +86,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, // Consumer overrides. void cancel(); void acknowledged(const broker::QueuedMessage&); + bool browseAcquired() const { return true; } bool hideDeletedError(); @@ -118,6 +119,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, bool accept(boost::intrusive_ptr<broker::Message>); void cancel() {} void acknowledged(const broker::QueuedMessage&) {} + bool browseAcquired() const; broker::OwnershipToken* getSession(); diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index d2de384f08..36f5b24c02 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -883,16 +883,20 @@ class DtxTests(BrokerTest): t5.send(["1", "2"]) # Accept messages in a transaction before/after join then commit + # Note: Message sent outside transaction, we're testing transactional acceptance. t6 = DtxTestFixture(self, cluster[0], "t6") t6.send(["a","b","c"]) t6.start() self.assertEqual(t6.accept().body, "a"); + t6.verify(sessions, ["b", "c"]) # Accept messages in a transaction before/after join then roll back + # Note: Message sent outside transaction, we're testing transactional acceptance. t7 = DtxTestFixture(self, cluster[0], "t7") t7.send(["a","b","c"]) t7.start() self.assertEqual(t7.accept().body, "a"); + t7.verify(sessions, ["b", "c"]) # Ended, suspended transactions across join. t8 = DtxTestFixture(self, cluster[0], "t8") @@ -948,6 +952,7 @@ class DtxTests(BrokerTest): # Rollback t7 self.assertEqual(t7.accept().body, "b"); + t7.verify(sessions, ["c"]) t7.end() t7.rollback() t7.verify(sessions, ["a", "b", "c"]) |