summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:17:39 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:17:39 +0000
commitad62fb6af2adf01d4815bf2612c1a605ecd2d200 (patch)
tree5830eef86e0d01f2214ff6f8965415a92b990e27
parent6677842dee21d086dc030f5839fbad98bad01de2 (diff)
downloadqpid-python-ad62fb6af2adf01d4815bf2612c1a605ecd2d200.tar.gz
QPID-3603: Only ReplicatingSubscriptions can browse acquired messages.
Previously any subscription would browse acquired messages, which is unexpected behavior. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245558 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h9
-rw-r--r--qpid/cpp/src/qpid/broker/FifoDistributor.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py5
5 files changed, 16 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index b3d6f23732..682c75ed4f 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,6 +64,11 @@ class Consumer
virtual OwnershipToken* getSession() = 0;
virtual void cancel() = 0;
+ /** Returns true if the browser wants acquired as well as
+ * available messages.
+ */
+ virtual bool browseAcquired() const { return false; };
+
/** Called when the peer has acknowledged receipt of the message.
* Not to be confused with accept() above, which is asking if
* this consumer will consume/browse the message.
diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
index 074c2b9a9d..c9ba894297 100644
--- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
+++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
@@ -42,7 +42,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- return messages.browse(c->getPosition(), next, false);
+ return messages.browse(c->getPosition(), next, !c->browseAcquired());
}
void FifoDistributor::query(qpid::types::Variant::Map&) const
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index e8571cf871..af6180305d 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -287,6 +287,7 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { re
void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); }
OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index fa2093ac61..e311f9505a 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -86,6 +86,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
// Consumer overrides.
void cancel();
void acknowledged(const broker::QueuedMessage&);
+ bool browseAcquired() const { return true; }
bool hideDeletedError();
@@ -118,6 +119,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
bool accept(boost::intrusive_ptr<broker::Message>);
void cancel() {}
void acknowledged(const broker::QueuedMessage&) {}
+ bool browseAcquired() const;
broker::OwnershipToken* getSession();
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index d2de384f08..36f5b24c02 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -883,16 +883,20 @@ class DtxTests(BrokerTest):
t5.send(["1", "2"])
# Accept messages in a transaction before/after join then commit
+ # Note: Message sent outside transaction, we're testing transactional acceptance.
t6 = DtxTestFixture(self, cluster[0], "t6")
t6.send(["a","b","c"])
t6.start()
self.assertEqual(t6.accept().body, "a");
+ t6.verify(sessions, ["b", "c"])
# Accept messages in a transaction before/after join then roll back
+ # Note: Message sent outside transaction, we're testing transactional acceptance.
t7 = DtxTestFixture(self, cluster[0], "t7")
t7.send(["a","b","c"])
t7.start()
self.assertEqual(t7.accept().body, "a");
+ t7.verify(sessions, ["b", "c"])
# Ended, suspended transactions across join.
t8 = DtxTestFixture(self, cluster[0], "t8")
@@ -948,6 +952,7 @@ class DtxTests(BrokerTest):
# Rollback t7
self.assertEqual(t7.accept().body, "b");
+ t7.verify(sessions, ["c"])
t7.end()
t7.rollback()
t7.verify(sessions, ["a", "b", "c"])