summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:00:38 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:00:38 +0000
commit26161156a17164d62010dd9d531a96906b73ce0e (patch)
treefcdb5c40a42565bbb393617098643771838d8c39
parent5ea482321f8ed09ab7c5fd26634d0174ad5ef6fd (diff)
downloadqpid-python-26161156a17164d62010dd9d531a96906b73ce0e.tar.gz
QPID-3603: Prototype of replicating browser.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233624 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--qpid/cpp/src/qpid/broker/FifoDistributor.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.h4
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.cpp160
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.h21
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.cpp36
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.h12
-rw-r--r--qpid/cpp/src/qpid/broker/Messages.h47
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp33
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp137
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h7
-rw-r--r--qpid/cpp/src/qpid/broker/QueueReplicator.cpp128
-rw-r--r--qpid/cpp/src/qpid/broker/QueueReplicator.h57
-rw-r--r--qpid/cpp/src/qpid/broker/QueuedMessage.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp271
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h21
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp2
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp8
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py6
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/message.py3
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py11
28 files changed, 760 insertions, 277 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index c5d2a45f69..d4596f55eb 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1007,6 +1007,7 @@ set (qpidbroker_SOURCES
qpid/broker/QueueListeners.cpp
qpid/broker/FifoDistributor.cpp
qpid/broker/MessageGroupManager.cpp
+ qpid/broker/QueueReplicator.cpp
qpid/broker/PersistableMessage.cpp
qpid/broker/Bridge.cpp
qpid/broker/Connection.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index fb26251da0..c1e42d382a 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -625,6 +625,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueuedMessage.h \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
+ qpid/broker/QueueReplicator.h \
+ qpid/broker/QueueReplicator.cpp \
qpid/broker/RateFlowcontrol.h \
qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 12c2194381..340e53c542 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
@@ -96,8 +97,11 @@ void Bridge::create(Connection& c)
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
- if (args.i_srcIsQueue) {
- peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
+ if (args.i_srcIsQueue) {
+ //TODO: something other than this which is nasty...
+ bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options);
+
+ peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 0b8fe95d5e..43ca1ae04b 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
bool _acquired,
bool accepted,
bool _windowing,
- uint32_t _credit) : msg(_msg),
+ uint32_t _credit, bool _delayedCompletion) : msg(_msg),
queue(_queue),
tag(_tag),
acquired(_acquired),
@@ -46,7 +46,8 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
completed(false),
ended(accepted && acquired),
windowing(_windowing),
- credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
+ credit(msg.payload ? msg.payload->getRequiredCredit() : _credit),
+ delayedCompletion(_delayedCompletion)
{}
bool DeliveryRecord::setEnded()
@@ -111,8 +112,14 @@ void DeliveryRecord::complete() {
}
bool DeliveryRecord::accept(TransactionContext* ctxt) {
- if (acquired && !ended) {
- queue->dequeue(ctxt, msg);
+ if (!ended) {
+ if (acquired) {
+ queue->dequeue(ctxt, msg);
+ } else if (delayedCompletion) {
+ //TODO: this is a nasty way to do this; change it
+ msg.payload->getIngressCompletion().finishCompleter();
+ QPID_LOG(debug, "Completed " << msg.payload.get());
+ }
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 5a331357be..90e72aaf0d 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -63,6 +63,7 @@ class DeliveryRecord
* after that).
*/
uint32_t credit;
+ bool delayedCompletion;
public:
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -71,7 +72,8 @@ class DeliveryRecord
bool acquired,
bool accepted,
bool windowing,
- uint32_t credit=0 // Only used if msg is empty.
+ uint32_t credit=0, // Only used if msg is empty.
+ bool delayedCompletion=false
);
bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
index eb1f0a402e..074c2b9a9d 100644
--- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
+++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
@@ -30,11 +30,7 @@ FifoDistributor::FifoDistributor(Messages& container)
bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next )
{
- if (!messages.empty()) {
- next = messages.front(); // by default, consume oldest msg
- return true;
- }
- return false;
+ return messages.consume(next);
}
bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
@@ -46,9 +42,7 @@ bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- if (!messages.empty() && messages.next(c->getPosition(), next))
- return true;
- return false;
+ return messages.browse(c->getPosition(), next, false);
}
void FifoDistributor::query(qpid::types::Variant::Map&) const
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
index 3262e343a3..49c0a32c19 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -32,7 +32,7 @@ void LegacyLVQ::setNoBrowse(bool b)
noBrowse = b;
}
-bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
if (i != messages.end() && i->second.payload == message.payload) {
@@ -44,9 +44,9 @@ bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& m
}
}
-bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool LegacyLVQ::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- if (MessageMap::next(position, message)) {
+ if (MessageMap::browse(position, message, unacquired)) {
if (!noBrowse) index.erase(getKey(message));
return true;
} else {
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
index dd0fd7aaec..695e51131d 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
@@ -40,8 +40,8 @@ class LegacyLVQ : public MessageMap
{
public:
LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
- bool remove(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const framing::SequenceNumber&, QueuedMessage&);
+ bool acquire(const framing::SequenceNumber&, QueuedMessage&);
+ bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void removeIf(Predicate);
void setNoBrowse(bool);
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
index 24b8f6f895..c9e91495c8 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
@@ -20,121 +20,155 @@
*/
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
-size_t MessageDeque::size()
-{
- return messages.size();
-}
-
-bool MessageDeque::empty()
-{
- return messages.empty();
-}
+MessageDeque::MessageDeque() : available(0), head(0) {}
-void MessageDeque::reinsert(const QueuedMessage& message)
+size_t MessageDeque::index(const framing::SequenceNumber& position)
{
- messages.insert(lower_bound(messages.begin(), messages.end(), message), message);
-}
-
-MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position)
-{
- if (!messages.empty()) {
- QueuedMessage comp;
- comp.position = position;
- unsigned long diff = position.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
- return lower_bound(messages.begin(),messages.begin()+maxEnd,comp);
- } else {
- return messages.end();
- }
+ //assuming a monotonic sequence, with no messages removed except
+ //from the ends of the deque, we can use the position to determin
+ //an index into the deque
+ if (messages.empty() || position < messages.front().position) return 0;
+ return position - messages.front().position;
}
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+bool MessageDeque::deleted(const QueuedMessage& m)
{
- Deque::iterator i = seek(position);
- if (i != messages.end() && i->position == position) {
- message = *i;
- if (remove) messages.erase(i);
+ size_t i = index(m.position);
+ if (i < messages.size()) {
+ messages[i].status = QueuedMessage::DELETED;
+ clean();
return true;
} else {
return false;
}
}
-bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+size_t MessageDeque::size()
{
- return find(position, message, true);
+ return available;
}
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
+void MessageDeque::release(const QueuedMessage& message)
{
- return find(position, message, false);
+ size_t i = index(message.position);
+ if (i < messages.size()) {
+ QueuedMessage& m = messages[i];
+ if (m.status == QueuedMessage::ACQUIRED) {
+ if (head > i) head = i;
+ m.status = QueuedMessage::AVAILABLE;
+ ++available;
+ }
+ } else {
+ QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
+ }
}
-bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
- if (messages.empty()) {
- return false;
- } else if (position < front().position) {
- message = front();
- return true;
- } else {
- Deque::iterator i = seek(position+1);
- if (i != messages.end()) {
- message = *i;
+ size_t i = index(position);
+ if (i < messages.size()) {
+ QueuedMessage& temp = messages[i];
+ if (temp.status == QueuedMessage::AVAILABLE) {
+ temp.status = QueuedMessage::ACQUIRED;
+ --available;
+ message = temp;
return true;
- } else {
- return false;
}
}
+ return false;
}
-QueuedMessage& MessageDeque::front()
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return messages.front();
+ size_t i = index(position);
+ if (i < messages.size()) {
+ message = messages[i];
+ return true;
+ } else {
+ return false;
+ }
}
-void MessageDeque::pop()
+bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- if (!messages.empty()) {
- messages.pop_front();
+ //get first message that is greater than position
+ size_t i = index(position + 1);
+ while (i < messages.size()) {
+ QueuedMessage& m = messages[i++];
+ if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) {
+ message = m;
+ return true;
+ }
}
+ return false;
}
-bool MessageDeque::pop(QueuedMessage& out)
+bool MessageDeque::consume(QueuedMessage& message)
{
- if (messages.empty()) {
- return false;
- } else {
- out = front();
- messages.pop_front();
- return true;
+ while (head < messages.size()) {
+ QueuedMessage& i = messages[head++];
+ if (i.status == QueuedMessage::AVAILABLE) {
+ i.status = QueuedMessage::ACQUIRED;
+ --available;
+ message = i;
+ return true;
+ }
}
+ return false;
}
bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
+ //add padding to prevent gaps in sequence, which break the index
+ //calculation (needed for queue replication)
+ while (messages.size() && (added.position - messages.back().position) > 1) {
+ QueuedMessage dummy;
+ dummy.position = messages.back().position + 1;
+ dummy.status = QueuedMessage::DELETED;
+ messages.push_back(dummy);
+ QPID_LOG(debug, "Adding padding at " << dummy.position << ", between " << messages.back().position << " and " << added.position);
+ }
messages.push_back(added);
+ messages.back().status = QueuedMessage::AVAILABLE;
+ if (head >= messages.size()) head = messages.size() - 1;
+ ++available;
return false;//adding a message never causes one to be removed for deque
}
+void MessageDeque::clean()
+{
+ while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
+ messages.pop_front();
+ if (head) --head;
+ }
+}
+
void MessageDeque::foreach(Functor f)
{
- std::for_each(messages.begin(), messages.end(), f);
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE) {
+ f(*i);
+ }
+ }
}
void MessageDeque::removeIf(Predicate p)
{
- for (Deque::iterator i = messages.begin(); i != messages.end();) {
- if (p(*i)) {
- i = messages.erase(i);
- } else {
- ++i;
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE && p(*i)) {
+ //Use special status for this as messages are not yet
+ //dequeued, but should not be considered on the queue
+ //either (used for purging and moving)
+ i->status = QueuedMessage::REMOVED;
+ --available;
}
}
+ clean();
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h
index 0e1aef2986..4d3a5dcdd5 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.h
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.h
@@ -34,17 +34,14 @@ namespace broker {
class MessageDeque : public Messages
{
public:
+ MessageDeque();
size_t size();
- bool empty();
-
- void reinsert(const QueuedMessage&);
- bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool deleted(const QueuedMessage&);
+ void release(const QueuedMessage&);
+ bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const framing::SequenceNumber&, QueuedMessage&);
-
- QueuedMessage& front();
- void pop();
- bool pop(QueuedMessage&);
+ bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
+ bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void foreach(Functor);
@@ -53,9 +50,11 @@ class MessageDeque : public Messages
private:
typedef std::deque<QueuedMessage> Deque;
Deque messages;
+ size_t available;
+ size_t head;
- Deque::iterator seek(const framing::SequenceNumber&);
- bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+ size_t index(const framing::SequenceNumber&);
+ void clean();
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 7054ef0310..77f8a0b5df 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -204,7 +204,7 @@ MessageGroupManager::~MessageGroupManager()
}
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- if (messages.empty())
+ if (!messages.size())
return false;
next.position = c->getPosition();
@@ -216,15 +216,16 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
}
}
- while (messages.next( next.position, next )) {
+ while (messages.browse( next.position, next, true )) {
GroupState& group = findGroup(next);
if (!group.owned()) {
- if (group.members.front() == next.position) { // only take from head!
+ //TODO: make acquire more efficient when we already have the message in question
+ if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head!
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
<< "'s head message still pending. pos=" << group.members.front());
- } else if (group.owner == c->getName()) {
+ } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
return true;
}
}
@@ -249,9 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
// browse: allow access to any available msg, regardless of group ownership (?ok?)
- if (!messages.empty() && messages.next(c->getPosition(), next))
- return true;
- return false;
+ return messages.browse(c->getPosition(), next, false);
}
void MessageGroupManager::query(qpid::types::Variant::Map& status) const
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp
index 39e23df533..048df45434 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp
@@ -27,6 +27,8 @@ namespace {
const std::string EMPTY;
}
+bool MessageMap::deleted(const QueuedMessage&) { return true; }
+
std::string MessageMap::getKey(const QueuedMessage& message)
{
const framing::FieldTable* ft = message.payload->getApplicationHeaders();
@@ -44,7 +46,7 @@ bool MessageMap::empty()
return messages.empty();
}
-void MessageMap::reinsert(const QueuedMessage& message)
+void MessageMap::release(const QueuedMessage& message)
{
std::string key = getKey(message);
Index::iterator i = index.find(key);
@@ -54,7 +56,7 @@ void MessageMap::reinsert(const QueuedMessage& message)
} //else message has already been replaced
}
-bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
if (i != messages.end()) {
@@ -77,38 +79,22 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me
}
}
-bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
{
- if (!messages.empty() && position < front().position) {
- message = front();
+ Ordering::iterator i = messages.lower_bound(position+1);
+ if (i != messages.end()) {
+ message = i->second;
return true;
} else {
- Ordering::iterator i = messages.lower_bound(position+1);
- if (i != messages.end()) {
- message = i->second;
- return true;
- } else {
- return false;
- }
+ return false;
}
}
-QueuedMessage& MessageMap::front()
-{
- return messages.begin()->second;
-}
-
-void MessageMap::pop()
-{
- QueuedMessage dummy;
- pop(dummy);
-}
-
-bool MessageMap::pop(QueuedMessage& out)
+bool MessageMap::consume(QueuedMessage& message)
{
Ordering::iterator i = messages.begin();
if (i != messages.end()) {
- out = i->second;
+ message = i->second;
erase(i);
return true;
} else {
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h
index 1128a1d54a..d1b8217f9b 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.h
+++ b/qpid/cpp/src/qpid/broker/MessageMap.h
@@ -43,14 +43,12 @@ class MessageMap : public Messages
size_t size();
bool empty();
- void reinsert(const QueuedMessage&);
- virtual bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool deleted(const QueuedMessage&);
+ void release(const QueuedMessage&);
+ virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
- virtual bool next(const framing::SequenceNumber&, QueuedMessage&);
-
- QueuedMessage& front();
- void pop();
- bool pop(QueuedMessage&);
+ virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
+ bool consume(QueuedMessage&);
virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
void foreach(Functor);
diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h
index 448f17432a..89f6d383ae 100644
--- a/qpid/cpp/src/qpid/broker/Messages.h
+++ b/qpid/cpp/src/qpid/broker/Messages.h
@@ -46,22 +46,21 @@ class Messages
* @return the number of messages available for delivery.
*/
virtual size_t size() = 0;
+
/**
- * @return true if there are no messages for delivery, false otherwise
+ * Called when a message is deleted from the queue.
*/
- virtual bool empty() = 0;
-
+ virtual bool deleted(const QueuedMessage&) = 0;
/**
- * Re-inserts a message back into its original position - used
- * when requeing released messages.
+ * Releases an acquired message, making it available again.
*/
- virtual void reinsert(const QueuedMessage&) = 0;
+ virtual void release(const QueuedMessage&) = 0;
/**
- * Remove the message at the specified position, returning true if
- * found, false otherwise. The removed message is passed back via
- * the second parameter.
+ * Acquire the message at the specified position, returning true
+ * if found, false otherwise. The acquired message is passed back
+ * via the second parameter.
*/
- virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&) = 0;
/**
* Find the message at the specified position, returning true if
* found, false otherwise. The matched message is passed back via
@@ -69,30 +68,22 @@ class Messages
*/
virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0;
/**
- * Return the next message to be given to a browsing subscrption
- * that has reached the specified poisition. The next messages is
- * passed back via the second parameter.
+ * Retrieve the next message to be given to a browsing
+ * subscription that has reached the specified position. The next
+ * message is passed back via the second parameter.
+ *
+ * @param unacquired, if true, will only browse unacquired messages
*
* @return true if there is another message, false otherwise.
*/
- virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool unacquired) = 0;
/**
- * Note: Caller is responsible for ensuring that there is a front
- * (e.g. empty() returns false)
+ * Retrieve the next message available for a consuming
+ * subscription.
*
- * @return the next message to be delivered
- */
- virtual QueuedMessage& front() = 0;
- /**
- * Removes the front message
- */
- virtual void pop() = 0;
- /**
- * @return true if there is a mesage to be delivered - in which
- * case that message will be returned via the parameter and
- * removed - otherwise false.
+ * @return true if there is such a message, false otherwise.
*/
- virtual bool pop(QueuedMessage&) = 0;
+ virtual bool consume(QueuedMessage&) = 0;
/**
* Pushes a message to the back of the 'queue'. For some types of
* queue this may cause another message to be removed; if that is
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index e07e73d323..d807ef22b1 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -32,6 +32,8 @@ PriorityQueue::PriorityQueue(int l) :
messages(levels, Deque()),
frontLevel(0), haveFront(false), cached(false) {}
+bool PriorityQueue::deleted(const QueuedMessage&) { return true; }
+
size_t PriorityQueue::size()
{
size_t total(0);
@@ -41,15 +43,7 @@ size_t PriorityQueue::size()
return total;
}
-bool PriorityQueue::empty()
-{
- for (int i = 0; i < levels; ++i) {
- if (!messages[i].empty()) return false;
- }
- return true;
-}
-
-void PriorityQueue::reinsert(const QueuedMessage& message)
+void PriorityQueue::release(const QueuedMessage& message)
{
uint p = getPriorityLevel(message);
messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
@@ -78,7 +72,7 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage&
return false;
}
-bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
return find(position, message, true);
}
@@ -88,7 +82,7 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage&
return find(position, message, false);
}
-bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
{
QueuedMessage match;
match.position = position+1;
@@ -112,16 +106,7 @@ bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage&
return found;
}
-QueuedMessage& PriorityQueue::front()
-{
- if (checkFront()) {
- return messages[frontLevel].front();
- } else {
- throw qpid::framing::InternalErrorException(QPID_MSG("No message available"));
- }
-}
-
-bool PriorityQueue::pop(QueuedMessage& message)
+bool PriorityQueue::consume(QueuedMessage& message)
{
if (checkFront()) {
message = messages[frontLevel].front();
@@ -133,12 +118,6 @@ bool PriorityQueue::pop(QueuedMessage& message)
}
}
-void PriorityQueue::pop()
-{
- QueuedMessage dummy;
- pop(dummy);
-}
-
bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
messages[getPriorityLevel(added)].push_back(added);
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h
index 4bf9d26a9d..67c31468d2 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.h
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h
@@ -40,16 +40,13 @@ class PriorityQueue : public Messages
PriorityQueue(int levels);
virtual ~PriorityQueue() {}
size_t size();
- bool empty();
- void reinsert(const QueuedMessage&);
- bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool deleted(const QueuedMessage&);
+ void release(const QueuedMessage&);
+ bool acquire(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
- bool next(const framing::SequenceNumber&, QueuedMessage&);
-
- QueuedMessage& front();
- void pop();
- bool pop(QueuedMessage&);
+ bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
+ bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void foreach(Functor);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 329fd1cb8c..b34bc65ec5 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -213,7 +213,7 @@ void Queue::requeue(const QueuedMessage& msg){
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
- messages->reinsert(msg);
+ messages->release(msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -296,46 +296,41 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
+ QueuedMessage msg;
while (true) {
Mutex::ScopedLock locker(messageLock);
- QueuedMessage msg;
-
- if (!allocator->nextConsumableMessage(c, msg)) { // no next available
- QPID_LOG(debug, "No messages available to dispatch to consumer " <<
- c->getName() << " on queue '" << name << "'");
- listeners.addListener(c);
- return NO_MESSAGES;
- }
-
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->setPosition(msg.position);
- acquire( msg.position, msg, locker);
- dequeue( 0, msg );
- continue;
- }
-
- // a message is available for this consumer - can the consumer use it?
+ if (allocator->nextConsumableMessage(c, msg)) {
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ c->setPosition(msg.position)
+ ;
+ dequeue(0, msg);
+ continue;
+ }
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
- (void) ok; assert(ok);
- ok = acquire( msg.position, msg, locker);
- (void) ok; assert(ok);
- m = msg;
- c->setPosition(m.position);
- return CONSUMED;
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
+ (void) ok; assert(ok);
+ observeAcquire(msg, locker);
+ m = msg;
+ return CONSUMED;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ messages->release(msg);
+ return CANT_CONSUME;
+ }
} else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ messages->release(msg);
return CANT_CONSUME;
}
} else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- c->setPosition(msg.position);
- return CANT_CONSUME;
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ listeners.addListener(c);
+ return NO_MESSAGES;
}
}
}
@@ -398,7 +393,6 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
-
Mutex::ScopedLock locker(messageLock);
if (messages->find(pos, msg))
return true;
@@ -460,7 +454,7 @@ void Queue::cancel(Consumer::shared_ptr c){
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if (messages->pop(msg))
+ if (messages->consume(msg))
observeAcquire(msg, locker);
return msg;
}
@@ -632,6 +626,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
// Update observers and message state:
observeAcquire(*qmsg, locker);
dequeue(0, *qmsg);
+ QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
// now reroute if necessary
if (dest.get()) {
assert(qmsg->payload);
@@ -663,24 +658,11 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
return c.matches.size();
}
-/** Acquire the front (oldest) message from the in-memory queue.
- * assumes messageLock held by caller
- */
-void Queue::pop(const Mutex::ScopedLock& locker)
-{
- assertClusterSafe();
- QueuedMessage msg;
- if (messages->pop(msg)) {
- observeAcquire(msg, locker);
- ++dequeueSincePurge;
- }
-}
-
/** Acquire the message at the given position, return true and msg if acquire succeeds */
bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
const Mutex::ScopedLock& locker)
{
- if (messages->remove(position, msg)) {
+ if (messages->acquire(position, msg)) {
observeAcquire(msg, locker);
++dequeueSincePurge;
return true;
@@ -867,12 +849,13 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
* Removes the first (oldest) message from the in-memory delivery queue as well dequeing
* it from the logical (and persistent if applicable) queue
*/
-void Queue::popAndDequeue(const Mutex::ScopedLock& held)
+bool Queue::popAndDequeue(QueuedMessage& msg)
{
- if (!messages->empty()) {
- QueuedMessage msg = messages->front();
- pop(held);
+ if (messages->consume(msg)) {
dequeue(0, msg);
+ return true;
+ } else {
+ return false;
}
}
@@ -884,6 +867,7 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
+ messages->deleted(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -1070,10 +1054,10 @@ void Queue::destroyed()
unbind(broker->getExchanges());
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
- while(!messages->empty()){
- DeliverableMessage msg(messages->front().payload);
+ QueuedMessage m;
+ while(popAndDequeue(m)){
+ DeliverableMessage msg(m.payload);
alternateExchange->routeWithAlternate(msg);
- popAndDequeue(locker);
}
alternateExchange->decAlternateUsers();
}
@@ -1336,6 +1320,7 @@ void Queue::query(qpid::types::Variant::Map& results) const
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
+ QPID_LOG(info, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
@@ -1421,6 +1406,12 @@ void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
observers.insert(observer);
}
+void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
+{
+ Mutex::ScopedLock locker(messageLock);
+ observers.erase(observer);
+}
+
void Queue::flush()
{
ScopedUse u(barrier);
@@ -1452,6 +1443,38 @@ void Queue::setDequeueSincePurge(uint32_t value) {
dequeueSincePurge = value;
}
+namespace{
+class FindLowest
+{
+ public:
+ FindLowest() : init(false) {}
+ void process(const QueuedMessage& message) {
+ QPID_LOG(debug, "FindLowest processing: " << message.position);
+ if (!init || message.position < lowest) lowest = message.position;
+ init = true;
+ }
+ bool getLowest(qpid::framing::SequenceNumber& result) {
+ if (init) {
+ result = lowest;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ private:
+ bool init;
+ qpid::framing::SequenceNumber lowest;
+};
+}
+
+bool Queue::getOldest(qpid::framing::SequenceNumber& oldest)
+{
+ //Horribly inefficient, but saves modifying Messages interface and
+ //all its implementations at present:
+ FindLowest f;
+ eachMessage(boost::bind(&FindLowest::process, &f, _1));
+ return f.getLowest(oldest);
+}
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 59ae41e768..b66600ef43 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -148,10 +148,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-
- /** modify the Queue's message container - assumes messageLock held */
- void pop(const sys::Mutex::ScopedLock& held); // acquire front msg
- void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
+ bool popAndDequeue(QueuedMessage&);
// acquire message @ position, return true and set msg if acquire succeeds
bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
const sys::Mutex::ScopedLock& held);
@@ -386,6 +383,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
void addObserver(boost::shared_ptr<QueueObserver>);
+ void removeObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
/**
* Notify queue that recovery has completed.
@@ -409,6 +407,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
+ bool getOldest(framing::SequenceNumber& result);
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.cpp b/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
new file mode 100644
index 0000000000..01c0c8e272
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueueReplicator.cpp
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueReplicator.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+QueueReplicator::QueueReplicator(const std::string& name, boost::shared_ptr<Queue> q) : Exchange(name, 0, 0), queue(q), current(queue->getPosition()) {}
+QueueReplicator::~QueueReplicator() {}
+
+namespace {
+const std::string DEQUEUE_EVENT("dequeue-event");
+const std::string REPLICATOR("qpid.replicator-");
+}
+
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
+{
+ if (key == DEQUEUE_EVENT) {
+ std::string content;
+ msg.getMessage().getFrames().getContent(content);
+ qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+ qpid::framing::SequenceSet latest;
+ latest.decode(buffer);
+
+ //TODO: should be able to optimise the following
+ for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
+ if (current < *i) {
+ //haven't got that far yet, record the dequeue
+ dequeued.add(*i);
+ QPID_LOG(debug, "Recording dequeue of message at " << *i << " from " << queue->getName());
+ } else {
+ QueuedMessage message;
+ if (queue->acquireMessageAt(*i, message)) {
+ queue->dequeue(0, message);
+ QPID_LOG(info, "Dequeued message at " << *i << " from " << queue->getName());
+ } else {
+ QPID_LOG(error, "Unable to dequeue message at " << *i << " from " << queue->getName());
+ }
+ }
+ }
+ } else {
+ //take account of any gaps in sequence created by messages
+ //dequeued before our subscription reached them
+ while (dequeued.contains(++current)) {
+ dequeued.remove(current);
+ QPID_LOG(debug, "Skipping dequeued message at " << current << " from " << queue->getName());
+ queue->setPosition(current);
+ }
+ QPID_LOG(info, "Enqueued message on " << queue->getName() << "; currently at " << current);
+ msg.deliverTo(queue);
+ }
+}
+
+bool QueueReplicator::isReplicatingLink(const std::string& name)
+{
+ return name.find(REPLICATOR) == 0;
+}
+
+boost::shared_ptr<Exchange> QueueReplicator::create(const std::string& target, QueueRegistry& queues)
+{
+ boost::shared_ptr<Exchange> exchange;
+ if (isReplicatingLink(target)) {
+ std::string queueName = target.substr(REPLICATOR.size());
+ boost::shared_ptr<Queue> queue = queues.find(queueName);
+ if (!queue) {
+ QPID_LOG(warning, "Unable to create replicator, can't find " << queueName);
+ } else {
+ //TODO: need to cache the replicator
+ QPID_LOG(info, "Creating replicator for " << queueName);
+ exchange.reset(new QueueReplicator(target, queue));
+ }
+ }
+ return exchange;
+}
+
+bool QueueReplicator::initReplicationSettings(const std::string& target, QueueRegistry& queues, qpid::framing::FieldTable& settings)
+{
+ if (isReplicatingLink(target)) {
+ std::string queueName = target.substr(REPLICATOR.size());
+ boost::shared_ptr<Queue> queue = queues.find(queueName);
+ if (queue) {
+ settings.setInt("qpid.replicating-subscription", 1);
+ settings.setInt("qpid.high_sequence_number", queue->getPosition());
+ qpid::framing::SequenceNumber oldest;
+ if (queue->getOldest(oldest)) {
+ settings.setInt("qpid.low_sequence_number", oldest);
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
+
+const std::string QueueReplicator::typeName("queue-replicator");
+
+std::string QueueReplicator::getType() const
+{
+ return typeName;
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueReplicator.h b/qpid/cpp/src/qpid/broker/QueueReplicator.h
new file mode 100644
index 0000000000..679aa9240d
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueueReplicator.h
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_QUEUEREPLICATOR_H
+#define QPID_BROKER_QUEUEREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceSet.h"
+
+namespace qpid {
+namespace broker {
+
+class QueueRegistry;
+
+/**
+ * Dummy exchange for processing replication messages
+ */
+class QueueReplicator : public Exchange
+{
+ public:
+ QueueReplicator(const std::string& name, boost::shared_ptr<Queue>);
+ ~QueueReplicator();
+ std::string getType() const;
+ bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
+ bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
+ void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
+ bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
+ static bool isReplicatingLink(const std::string&);
+ static boost::shared_ptr<Exchange> create(const std::string&, QueueRegistry&);
+ static bool initReplicationSettings(const std::string&, QueueRegistry&, qpid::framing::FieldTable&);
+ static const std::string typeName;
+ private:
+ boost::shared_ptr<Queue> queue;
+ qpid::framing::SequenceNumber current;
+ qpid::framing::SequenceSet dequeued;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUEREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h
index 35e48b11f3..051ade41ea 100644
--- a/qpid/cpp/src/qpid/broker/QueuedMessage.h
+++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h
@@ -32,6 +32,7 @@ struct QueuedMessage
{
boost::intrusive_ptr<Message> payload;
framing::SequenceNumber position;
+ enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status;
Queue* queue;
QueuedMessage() : queue(0) {}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 2b9fd247f5..86ecba7aaa 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -26,6 +26,7 @@
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
@@ -114,7 +115,7 @@ void SemanticState::consume(const string& tag,
// "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
// Create a globally unique name so the broker can identify individual consumers
std::string name = session.getSessionId().str() + SEPARATOR + tag;
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -264,6 +265,224 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
+class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver
+{
+ public:
+ ReplicatingSubscription(SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ ~ReplicatingSubscription();
+
+ void init();
+ void cancel();
+ bool deliver(QueuedMessage& msg);
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ void acquired(const QueuedMessage&) {}
+ void requeued(const QueuedMessage&) {}
+
+ protected:
+ bool doDispatch();
+ private:
+ boost::shared_ptr<Queue> events;
+ boost::shared_ptr<Consumer> consumer;
+ qpid::framing::SequenceSet range;
+
+ void generateDequeueEvent();
+ class DelegatingConsumer : public Consumer
+ {
+ public:
+ DelegatingConsumer(ReplicatingSubscription&);
+ ~DelegatingConsumer();
+ bool deliver(QueuedMessage& msg);
+ void notify();
+ bool filter(boost::intrusive_ptr<Message>);
+ bool accept(boost::intrusive_ptr<Message>);
+ void cancel() {}
+ OwnershipToken* getSession();
+ private:
+ ReplicatingSubscription& delegate;
+ };
+};
+
+SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments)
+{
+ if (arguments.isSet("qpid.replicating-subscription")) {
+ shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init();
+ return result;
+ } else {
+ return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ }
+}
+
+std::string mask(const std::string& in)
+{
+ return std::string("$") + in + std::string("_internal");
+}
+
+class ReplicationStateInitialiser
+{
+ public:
+ ReplicationStateInitialiser(qpid::framing::SequenceSet& results,
+ const qpid::framing::SequenceNumber& start,
+ const qpid::framing::SequenceNumber& end);
+ void operator()(const QueuedMessage& m) { process(m); }
+ private:
+ qpid::framing::SequenceSet& results;
+ const qpid::framing::SequenceNumber start;
+ const qpid::framing::SequenceNumber end;
+ void process(const QueuedMessage&);
+};
+
+ReplicatingSubscription::ReplicatingSubscription(SemanticState* _parent,
+ const string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _acquire,
+ bool _exclusive,
+ const string& _tag,
+ const string& _resumeId,
+ uint64_t _resumeTtl,
+ const framing::FieldTable& _arguments
+) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, _resumeId, _resumeTtl, _arguments),
+ events(new Queue(mask(_name))),
+ consumer(new DelegatingConsumer(*this))
+{
+
+ if (_arguments.isSet("qpid.high_sequence_number")) {
+ qpid::framing::SequenceNumber hwm = _arguments.getAsInt("qpid.high_sequence_number");
+ qpid::framing::SequenceNumber lwm;
+ if (_arguments.isSet("qpid.low_sequence_number")) {
+ lwm = _arguments.getAsInt("qpid.low_sequence_number");
+ } else {
+ lwm = hwm;
+ }
+ qpid::framing::SequenceNumber oldest;
+ if (_queue->getOldest(oldest)) {
+ if (oldest >= hwm) {
+ range.add(lwm, --oldest);
+ } else if (oldest >= lwm) {
+ ReplicationStateInitialiser initialiser(range, lwm, hwm);
+ _queue->eachMessage(initialiser);
+ } else { //i.e. have older message on master than is reported to exist on replica
+ QPID_LOG(warning, "Replica appears to be missing message on master");
+ }
+ } else {
+ //local queue (i.e. master) is empty
+ range.add(lwm, _queue->getPosition());
+ }
+ QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << " are " << range
+ << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << _queue->getPosition() << ")");
+ //set position of 'cursor'
+ position = hwm;
+ }
+}
+
+bool ReplicatingSubscription::deliver(QueuedMessage& m)
+{
+ return ConsumerImpl::deliver(m);
+}
+
+void ReplicatingSubscription::init()
+{
+ getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+void ReplicatingSubscription::cancel()
+{
+ getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+//called before we get notified of the message being available and
+//under the message lock in the queue
+void ReplicatingSubscription::enqueued(const QueuedMessage& m)
+{
+ QPID_LOG(debug, "Enqueued message at " << m.position);
+ //delay completion
+ m.payload->getIngressCompletion().startCompleter();
+ QPID_LOG(debug, "Delayed " << m.payload.get());
+}
+
+class Buffer : public qpid::framing::Buffer
+{
+ public:
+ Buffer(size_t size) : qpid::framing::Buffer(new char[size], size) {}
+ ~Buffer() { delete[] getPointer(); }
+};
+
+void ReplicatingSubscription::generateDequeueEvent()
+{
+ Buffer buffer(range.encodedSize());
+ range.encode(buffer);
+ range.clear();
+ buffer.reset();
+
+ //generate event message
+ boost::intrusive_ptr<Message> event = new Message();
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
+ content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ event->getFrames().append(method);
+ event->getFrames().append(header);
+ event->getFrames().append(content);
+
+ DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ props->setRoutingKey("dequeue-event");
+
+ events->deliver(event);
+}
+
+//called after the message has been removed from the deque and under
+//the message lock in the queue
+void ReplicatingSubscription::dequeued(const QueuedMessage& m)
+{
+ {
+ Mutex::ScopedLock l(lock);
+ range.add(m.position);
+ QPID_LOG(debug, "Updated dequeue event to include message at " << m.position << "; subscription is at " << position);
+ }
+ notify();
+ if (m.position > position) {
+ m.payload->getIngressCompletion().finishCompleter();
+ QPID_LOG(debug, "Completed " << m.payload.get() << " early due to dequeue");
+ }
+}
+
+bool ReplicatingSubscription::doDispatch()
+{
+ {
+ Mutex::ScopedLock l(lock);
+ if (!range.empty()) {
+ generateDequeueEvent();
+ }
+ }
+ bool r1 = events->dispatch(consumer);
+ bool r2 = ConsumerImpl::doDispatch();
+ return r1 || r2;
+}
+
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
@@ -332,7 +551,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode());
+ DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, dynamic_cast<const ReplicatingSubscription*>(this));
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
@@ -340,7 +559,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- queue->dequeue(0 /*ctxt*/, msg);
+ msg.queue->dequeue(0, msg);
record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
@@ -455,8 +674,10 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
- cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+ if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) {
+ cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues());
+ if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName);
+ }
cacheExchange->setProperties(msg);
/* verify the userid if specified: */
@@ -646,9 +867,14 @@ bool SemanticState::ConsumerImpl::haveCredit()
}
}
+bool SemanticState::ConsumerImpl::doDispatch()
+{
+ return queue->dispatch(shared_from_this());
+}
+
void SemanticState::ConsumerImpl::flush()
{
- while(haveCredit() && queue->dispatch(shared_from_this()))
+ while(haveCredit() && doDispatch())
;
credit.cancel();
}
@@ -710,7 +936,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
bool SemanticState::ConsumerImpl::doOutput()
{
try {
- return haveCredit() && queue->dispatch(shared_from_this());
+ return haveCredit() && doDispatch();
} catch (const SessionException& e) {
throw SessionOutputException(e, parent->session.getChannel());
}
@@ -820,4 +1046,35 @@ void SemanticState::detached()
}
}
+ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
+ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
+{
+ return delegate.deliver(m);
+}
+void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
+bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
+
+ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r,
+ const qpid::framing::SequenceNumber& s,
+ const qpid::framing::SequenceNumber& e)
+ : results(r), start(s), end(e)
+{
+ results.add(start, end);
+}
+
+void ReplicationStateInitialiser::process(const QueuedMessage& message)
+{
+ if (message.position < start) {
+ //replica does not have a message that should still be on the queue
+ QPID_LOG(warning, "Replica appears to be missing message at " << message.position);
+ } else if (message.position >= start && message.position <= end) {
+ //i.e. message is within the intial range and has not been dequeued, so remove it from the results
+ results.remove(message.position);
+ } //else message has not been seen by replica yet so can be ignored here
+
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 26fd815424..ec4bcb756c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -30,6 +30,7 @@
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/NameGenerator.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
@@ -74,7 +75,9 @@ class SemanticState : private boost::noncopyable {
public boost::enable_shared_from_this<ConsumerImpl>,
public management::Manageable
{
+ protected:
mutable qpid::sys::Mutex lock;
+ private:
SemanticState* const parent;
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
@@ -95,17 +98,20 @@ class SemanticState : private boost::noncopyable {
void allocateCredit(boost::intrusive_ptr<Message>& msg);
bool haveCredit();
+ protected:
+ virtual bool doDispatch();
+ size_t unacked() { return parent->unacked.size(); }
+
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
- const std::string& tag, const std::string& resumeId,
- uint64_t resumeTtl, const framing::FieldTable& arguments);
- ~ConsumerImpl();
+ const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ virtual ~ConsumerImpl();
OwnershipToken* getSession();
- bool deliver(QueuedMessage& msg);
+ virtual bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
void cancel() {}
@@ -142,9 +148,16 @@ class SemanticState : private boost::noncopyable {
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
+
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
+
+ static shared_ptr create(SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+
};
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 715376fd8d..2d05755fc7 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -275,7 +275,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m
populate(*message, *command);
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
- if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
+ if (transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
sys::Mutex::ScopedLock l(lock);
acceptTracker.delivered(transfer->getDestination(), command->getId());
}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 0b1b4cc59e..9a76bb28e1 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -711,7 +711,7 @@ namespace {
const std::string& expectedGroup,
const int expectedId )
{
- queue->dispatch(c);
+ BOOST_CHECK(queue->dispatch(c));
results.push_back(c->last);
std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
@@ -1026,6 +1026,11 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
+ /**
+ * TODO: Fix or replace the following test which incorrectly requeues a
+ * message that was never on the queue in the first place. This relied on
+ * internal details not part of the queue abstraction.
+
// check requeue 1
intrusive_ptr<Message> msg4 = create_message("e", "C");
intrusive_ptr<Message> msg5 = create_message("e", "D");
@@ -1047,6 +1052,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue2->clearLastNodeFailure();
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
+ */
}
QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index db5ec03df2..935db54458 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -886,9 +886,11 @@ class ReceiverTests(Base):
rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
self.drain(rb, expected=msgs)
self.drain(rc, expected=msgs)
- rb2 = self.ssn.receiver(rb.source)
- self.assertEmpty(rb2)
+ rc2 = self.ssn.receiver(rc.source)
+ self.assertEmpty(rc2)
self.drain(self.rcv, expected=[])
+ rb2 = self.ssn.receiver(rb.source)
+ self.drain(rb2, expected=msgs)
# XXX: need testUnsettled()
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
index 204b6ebd23..c6095a0579 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -1033,8 +1033,7 @@ class MessageTests(TestBase010):
#release all even messages
session.message_release(RangedSet(msg.id))
- #browse:
- session.message_subscribe(queue="q", destination="b", acquire_mode=1)
+ session.message_subscribe(queue="q", destination="b", acquire_mode=0)
b = session.incoming("b")
b.start()
for i in [2, 4, 6, 8, 10]:
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
index 99d11151e8..ef6734f136 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -202,6 +202,10 @@ class MultiConsumerMsgGroupTests(Base):
## Queue = A-0, B-1, A-2, b-3, C-4
## Owners= ^C1, ---, +C1, ---, ---
+ m2 = b1.fetch(0);
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 0
+
m2 = b1.fetch(0)
assert m2.properties['THE-GROUP'] == 'B'
assert m2.content['index'] == 1
@@ -713,6 +717,7 @@ class MultiConsumerMsgGroupTests(Base):
assert rc.status == 0
queue.update()
queue.msgDepth == 4 # the pending acquired A still counts!
+ s1.acknowledge()
# verify all other A's removed....
s2 = self.setup_session()
@@ -782,7 +787,7 @@ class MultiConsumerMsgGroupTests(Base):
except Empty:
pass
assert count == 3 # non-A's
- assert a_count == 1 # and one is an A
+ assert a_count == 2 # pending acquired message included in browse results
s1.acknowledge() # ack the consumed A-0
self.qmf_session.delBroker(self.qmf_broker)
@@ -829,7 +834,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify all other A's removed from msg-group-q
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
+ b1 = s2.receiver("msg-group-q", options={"capacity":0})
count = 0
try:
while True:
@@ -963,7 +968,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify all other A's removed....
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
+ b1 = s2.receiver("msg-group-q", options={"capacity":0})
count = 0
try:
while True: