summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-08 09:24:15 +0000
committerAlan Conway <aconway@apache.org>2014-08-08 09:24:15 +0000
commit2602ecaf16a3ddf424383214da2ea846634c083f (patch)
treecbe7e6a423e2d521c2ebce63a479f2a4e3074ae9
parenta833f714a4de983bce8fb1c2f6b87070bd3b4309 (diff)
downloadqpid-python-2602ecaf16a3ddf424383214da2ea846634c083f.tar.gz
QPID-5966: HA mixing tx enqueue and non-tx dequeue leaves extra messages on backup.
There were several problems: 1. Positions of transactionally enqueued messages not known to QueueReplicator, so not dequeued on backup if dequeued outside a TX on primary. 2. Race condition if tx created immediately after queue could cause duplication of TX message. 3. Replication IDs were not being set during recovery from store (regression, store change?) Fix: 1. Update positions QueueReplicator positions via QueueObserver::enqueued to see all enqueues. 2. Check for duplicate replication-ids on backup in QueueReplicator::route. 3. Set replication-id in publish() if not already set in record(). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616704 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/IdSetter.h25
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp88
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h4
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h4
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h1
-rw-r--r--qpid/cpp/src/qpid/ha/types.cpp27
-rw-r--r--qpid/cpp/src/qpid/ha/types.h18
-rw-r--r--qpid/cpp/src/tests/brokertest.py17
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py57
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp12
17 files changed, 228 insertions, 86 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 7e15ac1ad2..250acf6b4e 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -29,6 +29,7 @@
#include "qpid/management/Manageable.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
+#include "qpid/assert.h"
#include <algorithm>
#include <string.h>
@@ -46,11 +47,11 @@ using std::string;
namespace qpid {
namespace broker {
-Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0)
+Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false)
{}
Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p)
- : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0)
+ : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false)
{
if (persistentContext) persistentContext->setIngressCompletion(e);
}
@@ -297,9 +298,18 @@ void Message::processProperties(MapHandler& handler) const
sharedState->processProperties(handler);
}
-uint64_t Message::getReplicationId() const { return replicationId; }
+bool Message::hasReplicationId() const {
+ return isReplicationIdSet;
+}
+
+uint64_t Message::getReplicationId() const {
+ return replicationId;
+}
-void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; }
+void Message::setReplicationId(framing::SequenceNumber id) {
+ replicationId = id;
+ isReplicationIdSet = true;
+}
sys::AbsTime Message::getExpiration() const
{
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 4d42b173c0..fe0427abd3 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -168,6 +168,7 @@ public:
QPID_BROKER_EXTERN boost::intrusive_ptr<AsyncCompletion> getIngressCompletion() const;
QPID_BROKER_EXTERN boost::intrusive_ptr<PersistableMessage> getPersistentContext() const;
+ QPID_BROKER_EXTERN bool hasReplicationId() const;
QPID_BROKER_EXTERN uint64_t getReplicationId() const;
QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id);
@@ -214,6 +215,7 @@ public:
MessageState state;
qpid::framing::SequenceNumber sequence;
framing::SequenceNumber replicationId;
+ bool isReplicationIdSet:1;
void annotationsChanged();
bool getTtl(uint64_t&, uint64_t expiredValue) const;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 027448905b..f154e45a22 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -71,7 +71,7 @@ class HaBroker::BrokerObserver : public broker::BrokerObserver {
public:
void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
- q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter));
+ q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter(q->getName())));
}
};
diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h
index 67da62ef48..0350bf1519 100644
--- a/qpid/cpp/src/qpid/ha/IdSetter.h
+++ b/qpid/cpp/src/qpid/ha/IdSetter.h
@@ -43,10 +43,31 @@ namespace ha {
class IdSetter : public broker::MessageInterceptor
{
public:
- IdSetter(ReplicationId firstId=1) : nextId(firstId) {}
- void record(broker::Message& m) { m.setReplicationId(nextId++); }
+ IdSetter(const std::string& q, ReplicationId firstId=1) : queue(q), nextId(firstId) {
+ QPID_LOG(debug, "Replication-ID will be set for " << queue << " from " << firstId);
+ }
+
+ void record(broker::Message& m) {
+ // Record is called when a message is first delivered to a queue, before it has
+ // been enqueued or saved in a transaction buffer. This is when we normally want
+ // to assign a replication-id.
+ m.setReplicationId(nextId++);
+ QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
+ }
+
+ void publish(broker::Message& m) {
+ // Publish is called when a message is assigned a position on the queue,
+ // after any transaction has comitted. Normally this is too late to
+ // assign a replication-id but during broker start-up and recovery from
+ // store record() is not called, so set the ID now if not already set.
+ if (!m.hasReplicationId()) {
+ m.setReplicationId(nextId++);
+ QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m));
+ }
+ }
private:
+ std::string queue;
sys::AtomicValue<uint32_t> nextId;
};
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index be3dc25653..e7d77f9810 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -148,7 +148,7 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
{
Mutex::ScopedLock l(lock);
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
- QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
+ QPID_LOG(trace, logPrefix << "Enqueue: " << logMessageId(*q, m.getReplicationId()));
checkState(SENDING, "Too late for enqueue");
empty = false;
enqueues[q] += m.getReplicationId();
@@ -163,7 +163,7 @@ void PrimaryTxObserver::dequeue(
Mutex::ScopedLock l(lock);
checkState(SENDING, "Too late for dequeue");
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
- QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+ QPID_LOG(trace, logPrefix << "Dequeue: " << logMessageId(*q, pos, id));
empty = false;
dequeues[q] += id;
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 6ffd53ff21..94b7a53937 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -72,7 +72,7 @@ void QueueGuard::enqueued(const Message& m) {
ReplicationId id = m.getReplicationId();
Mutex::ScopedLock l(lock);
if (cancelled) return; // Don't record enqueues after we are cancelled.
- QPID_LOG(trace, logPrefix << "Delayed completion of " << LogMessageId(queue, m));
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << logMessageId(queue, m));
delayed[id] = m.getIngressCompletion();
m.getIngressCompletion()->startCompleter();
}
@@ -80,7 +80,7 @@ void QueueGuard::enqueued(const Message& m) {
// NOTE: Called with message lock held.
void QueueGuard::dequeued(const Message& m) {
ReplicationId id = m.getReplicationId();
- QPID_LOG(trace, logPrefix << "Dequeued " << LogMessageId(queue, m));
+ QPID_LOG(trace, logPrefix << "Dequeued " << logMessageId(queue, m));
Mutex::ScopedLock l(lock);
complete(id, l);
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 4e908fbe79..ca06fabe86 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -106,22 +106,34 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
class QueueReplicator::QueueObserver : public broker::QueueObserver {
public:
- QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {}
- void enqueued(const Message&) {}
- void dequeued(const Message&) {}
+ typedef boost::shared_ptr<QueueReplicator> Ptr;
+ QueueObserver(Ptr qr) : queueReplicator(qr) {}
+
+ void enqueued(const Message& m) {
+ Ptr qr = queueReplicator.lock();
+ if (qr) qr->enqueued(m);
+ }
+
+ void dequeued(const Message& m) {
+ Ptr qr = queueReplicator.lock();
+ if (qr) qr->dequeued(m);
+ }
+
void acquired(const Message&) {}
void requeued(const Message&) {}
void consumerAdded( const Consumer& ) {}
void consumerRemoved( const Consumer& ) {}
// Queue observer is destroyed when the queue is.
void destroy() {
- boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
+ Ptr qr = queueReplicator.lock();
if (qr) qr->destroy();
}
+
private:
boost::weak_ptr<QueueReplicator> queueReplicator;
};
+
boost::shared_ptr<QueueReplicator> QueueReplicator::create(
HaBroker& hb, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l)
{
@@ -278,48 +290,73 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
//TODO: should be able to optimise the following
for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
- PositionMap::iterator j = positions.find(*i);
- if (j != positions.end()) queue->dequeueMessageAt(j->second);
+ QueuePosition position;
+ {
+ Mutex::ScopedLock l(lock);
+ PositionMap::iterator j = positions.find(*i);
+ if (j == positions.end()) continue;
+ position = j->second;
+ }
+ queue->dequeueMessageAt(position); // Outside lock, will call dequeued().
+ // positions will be cleaned up in dequeued()
}
}
// Called in connection thread of the queues bridge to primary.
-
void QueueReplicator::route(Deliverable& deliverable)
{
try {
- Mutex::ScopedLock l(lock);
- if (!queue) return; // Already destroyed
broker::Message& message(deliverable.getMessage());
- string key(message.getRoutingKey());
- if (!isEventKey(message.getRoutingKey())) {
+ {
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
+ string key(message.getRoutingKey());
+ if (isEventKey(key)) {
+ DispatchMap::iterator i = dispatch.find(key);
+ if (i == dispatch.end()) {
+ QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
+ } else {
+ (i->second)(message.getContent(), l);
+ }
+ return;
+ }
ReplicationId id = nextId++;
- maxId = std::max(maxId, id);
message.setReplicationId(id);
- deliver(message);
- QueuePosition position = queue->getPosition();
- positions[id] = position;
- QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
- }
- else {
- DispatchMap::iterator i = dispatch.find(key);
- if (i == dispatch.end()) {
- QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
- }
- else {
- (i->second)(message.getContent(), l);
+ PositionMap::iterator i = positions.find(id);
+ if (i != positions.end()) {
+ QPID_LOG(trace, logPrefix << "Already on queue: " << logMessageId(*queue, message));
+ return;
}
+ QPID_LOG(trace, logPrefix << "Received: " << logMessageId(*queue, message));
}
+ deliver(message); // Outside lock, will call enqueued()
}
catch (const std::exception& e) {
haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
}
+
}
void QueueReplicator::deliver(const broker::Message& m) {
queue->deliver(m);
}
+// Called via QueueObserver when message is enqueued. Could be as part of deliver()
+// or in a different thread if a message is enqueued via a transaction.
+//
+void QueueReplicator::enqueued(const broker::Message& m) {
+ Mutex::ScopedLock l(lock);
+ maxId = std::max(maxId, ReplicationId(m.getReplicationId()));
+ positions[m.getReplicationId()] = m.getSequence();
+ QPID_LOG(trace, logPrefix << "Enqueued " << logMessageId(*queue, m));
+}
+
+// Called via QueueObserver
+void QueueReplicator::dequeued(const broker::Message& m) {
+ Mutex::ScopedLock l(lock);
+ positions.erase(m.getReplicationId());
+}
+
void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
@@ -349,8 +386,9 @@ std::string QueueReplicator::getType() const { return ReplicatingSubscription::Q
void QueueReplicator::promoted() {
if (queue) {
// On primary QueueReplicator no longer sets IDs, start an IdSetter.
+ QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1)
queue->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
+ boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), maxId+1)));
// Process auto-deletes
if (queue->isAutoDelete()) {
// Make a temporary shared_ptr to prevent premature deletion of queue.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 757f12c7a9..3d525db440 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -80,6 +80,10 @@ class QueueReplicator : public broker::Exchange,
void route(broker::Deliverable&);
+ // Called via QueueObserver
+ void enqueued(const broker::Message&);
+ void dequeued(const broker::Message&);
+
// Set if the queue has ever been subscribed to, used for auto-delete cleanup.
void setSubscribed() { subscribed = true; }
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 908458fad3..67e1e77681 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -216,14 +216,14 @@ bool ReplicatingSubscription::deliver(
try {
bool result = false;
if (skipEnqueue.contains(id)) {
- QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
+ QPID_LOG(trace, logPrefix << "Skip " << logMessageId(*getQueue(), m));
skipEnqueue -= id;
guard->complete(id); // This will never be acknowledged.
notify();
result = true;
}
else {
- QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
+ QPID_LOG(trace, logPrefix << "Replicated " << logMessageId(*getQueue(), m));
if (!ready && !isGuarded(l)) unready += id;
sendIdEvent(id, l);
result = ConsumerImpl::deliver(c, m);
@@ -231,7 +231,7 @@ bool ReplicatingSubscription::deliver(
checkReady(l);
return result;
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m)
+ QPID_LOG(critical, logPrefix << "Error replicating " << logMessageId(*getQueue(), m)
<< ": " << e.what());
throw;
}
@@ -268,7 +268,7 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
ReplicationId id = r.getReplicationId();
QPID_LOG(trace, logPrefix << "Acknowledged " <<
- LogMessageId(*getQueue(), r.getMessageId(), id));
+ logMessageId(*getQueue(), r.getMessageId(), id));
guard->complete(id);
{
Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 0e3f544d44..08c08b0ca3 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -137,8 +137,8 @@ class ReplicatingSubscription :
BrokerInfo getBrokerInfo() const { return info; }
-void skipEnqueues(const ReplicationIdSet& ids);
-void skipDequeues(const ReplicationIdSet& ids);
+ void skipEnqueues(const ReplicationIdSet& ids);
+ void skipDequeues(const ReplicationIdSet& ids);
protected:
bool doDispatch();
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index d2a647ae8f..ee8bd342b2 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -127,20 +127,19 @@ void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLoc
}
}
-void TxReplicator::route(broker::Deliverable& deliverable) {
- QueueReplicator::route(deliverable);
-}
-
void TxReplicator::deliver(const broker::Message& m_) {
- sys::Mutex::ScopedLock l(lock);
- if (!txBuffer) return;
- // Deliver message to the target queue, not the tx-queue.
+ boost::intrusive_ptr<broker::TxBuffer> txbuf;
broker::Message m(m_);
- m.setReplicationId(enq.id); // Use replicated id.
- boost::shared_ptr<broker::Queue> queue =
- haBroker.getBroker().getQueues().get(enq.queue);
- QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m));
- DeliverableMessage dm(m, txBuffer.get());
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
+ txbuf = txBuffer;
+ m.setReplicationId(enq.id); // Use enqueued replicated id.
+ }
+ // Deliver message to the target queue, not the tx-queue.
+ boost::shared_ptr<broker::Queue> queue = haBroker.getBroker().getQueues().get(enq.queue);
+ QPID_LOG(trace, logPrefix << "Deliver " << logMessageId(*queue, m.getReplicationId()));
+ DeliverableMessage dm(m, txbuf.get());
dm.deliverTo(queue);
}
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index 5c509d14a7..fe25fbc78b 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -66,7 +66,6 @@ class TxReplicator : public QueueReplicator {
std::string getType() const;
// QueueReplicator overrides
- void route(broker::Deliverable& deliverable);
using QueueReplicator::destroy;
void destroy(sys::Mutex::ScopedLock&);
diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp
index c02ae33470..00058a8a32 100644
--- a/qpid/cpp/src/qpid/ha/types.cpp
+++ b/qpid/cpp/src/qpid/ha/types.cpp
@@ -95,17 +95,24 @@ ostream& operator<<(ostream& o, const UuidSet& ids) {
return o;
}
-LogMessageId::LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) :
- queue(q.getName()), position(pos), replicationId(id) {}
-LogMessageId::LogMessageId(const broker::Queue& q, const broker::Message& m) :
- queue(q.getName()), position(m.getSequence()), replicationId(m.getReplicationId()) {}
-
-LogMessageId::LogMessageId(const std::string& q, const broker::Message& m) :
- queue(q), position(m.getSequence()), replicationId(m.getReplicationId()) {}
-
-std::ostream& operator<<(std::ostream& o, const LogMessageId& m) {
- return o << m.queue << "[" << m.position << "]=" << m.replicationId;
+std::string logMessageId(const std::string& q, QueuePosition pos, ReplicationId id) {
+ return Msg() << q << "[" << pos << "]" << "=" << id;
+}
+std::string logMessageId(const std::string& q, ReplicationId id) {
+ return Msg() << q << "[]" << "=" << id;
+}
+std::string logMessageId(const std::string& q, const broker::Message& m) {
+ return logMessageId(q, m.getSequence(), m.getReplicationId());
+}
+std::string logMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) {
+ return logMessageId(q.getName(), pos, id);
+}
+std::string logMessageId(const broker::Queue& q, ReplicationId id) {
+ return logMessageId(q.getName(), id);
+}
+std::string logMessageId(const broker::Queue& q, const broker::Message& m) {
+ return logMessageId(q.getName(), m);
}
void UuidSet::encode(framing::Buffer& b) const {
diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h
index 92157d411b..ae4b948dfc 100644
--- a/qpid/cpp/src/qpid/ha/types.h
+++ b/qpid/cpp/src/qpid/ha/types.h
@@ -132,17 +132,13 @@ typedef framing::SequenceNumber ReplicationId;
typedef framing::SequenceSet QueuePositionSet;
typedef framing::SequenceSet ReplicationIdSet;
-/** Helper for logging message ID */
-struct LogMessageId {
- typedef boost::shared_ptr<broker::Queue> QueuePtr;
- LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id);
- LogMessageId(const broker::Queue& q, const broker::Message& m);
- LogMessageId(const std::string& q, const broker::Message& m);
- const std::string& queue;
- QueuePosition position;
- ReplicationId replicationId;
-};
-std::ostream& operator<<(std::ostream&, const LogMessageId&);
+/** Helpers for logging message ID */
+std::string logMessageId(const std::string& q, QueuePosition pos, ReplicationId id);
+std::string logMessageId(const std::string& q, ReplicationId id);
+std::string logMessageId(const std::string& q, const broker::Message& m);
+std::string logMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id);
+std::string logMessageId(const broker::Queue& q, ReplicationId id);
+std::string logMessageId(const broker::Queue& q, const broker::Message& m);
/** Return short version of human-readable UUID. */
inline std::string shortStr(const types::Uuid& uuid) { return uuid.str().substr(0,8); }
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 44824fe67e..461ef0de9a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -428,17 +428,22 @@ class Broker(Popen):
assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line)
finally: log.close()
+def receiver_iter(receiver, timeout=0):
+ """Make an iterator out of a receiver. Returns messages till Empty is raised."""
+ try:
+ while True:
+ yield receiver.fetch(timeout=timeout)
+ except qm.Empty:
+ pass
+
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
r.capacity = 100
try:
- contents = []
- try:
- while True: contents.append(transform(r.fetch(timeout=timeout)))
- except qm.Empty: pass
- finally: r.close()
- return contents
+ return [transform(m) for m in receiver_iter(r, timeout)]
+ finally:
+ r.close()
def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
"""Assert that the contents of messages on queue (as retrieved
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f71560dffb..58b3ff2802 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1217,7 +1217,6 @@ class RecoveryTests(HaBrokerTest):
def test_stalled_backup(self):
"""Make sure that a stalled backup broker does not stall the primary"""
- # FIXME aconway 2014-04-15: merge with test_join_ready_cluster?
cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"])
os.kill(cluster[1].pid, signal.SIGSTOP)
s = cluster[0].connect().session()
@@ -1272,7 +1271,7 @@ class StoreTests(HaBrokerTest):
"""Verify that a backup erases queue data from store recovery before
doing catch-up from the primary."""
if self.check_skip(): return
- cluster = HaCluster(self, 2)
+ cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store'])
sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
@@ -1362,9 +1361,61 @@ class TransactionTests(HaBrokerTest):
tx.acknowledge()
tx.commit()
tx.sync()
+ tx.close()
+
+ for b in cluster:
+ self.assert_simple_commit_outcome(b, tx_queues)
+
+ # Verify non-tx dequeue is replicated correctly
+ c = cluster.connect(0, protocol=self.tx_protocol)
+ r = c.session().receiver("b")
+ ri = receiver_iter(r, timeout=1)
+ self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri])
+ r.session.acknowledge()
+ for b in cluster: b.assert_browse_backup("b", [], msg=b)
+
+ def check_enq_deq(self, cluster, queue, expect):
+ for b in cluster:
+ q = b.agent.getQueue(queue)
+ self.assertEqual(
+ (b.name,)+expect,
+ (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues))
+
+ def test_tx_enq_notx_deq(self):
+ """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
+ cluster = HaCluster(self, 2, test_store=True)
+ c = cluster.connect(0, protocol=self.tx_protocol)
+ tx = c.session(transactional=True)
+ c.session().sender("qq;{create:always}").send("m1")
+ tx.sender("qq;{create:always}").send("tx")
+ tx.commit()
tx.close()
- for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+ c.session().sender("qq;{create:always}").send("m2")
+ self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0))
+
+ notx = c.session()
+ self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))])
+ notx.acknowledge()
+ self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0))
+ for b in cluster: b.assert_browse_backup('qq', [], msg=b)
+ for b in cluster: self.assert_tx_clean(b)
+
+ def test_tx_enq_notx_deq_qpid_send(self):
+ """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
+ cluster = HaCluster(self, 2, test_store=True)
+
+ self.popen(
+ ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1',
+ '--content-string=foo']
+ ).assert_exit_ok()
+ for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b)
+ self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0))
+
+ self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok()
+ self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0))
+ for b in cluster: b.assert_browse_backup('qq', [], msg=b)
+ for b in cluster: self.assert_tx_clean(b)
def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index a744d07a12..3d9941baee 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -205,7 +205,7 @@ struct Transfer : public TransactionalClient, public Runnable
}
session.commit();
t++;
- if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+ if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
} catch (const TransactionAborted&) {
std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
session = connection.createTransactionalSession();
@@ -246,6 +246,16 @@ struct Controller : public Client
for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE);
+
+ // Clear out any garbage on queues.
+ Receiver receiver = session.createReceiver(address);
+ Message rmsg;
+ uint count(0);
+ while (receiver.fetch(rmsg, Duration::IMMEDIATE)) ++count;
+ session.acknowledge();
+ receiver.close();
+ if (!opts.quiet) std::cout << "Cleaned up " << count << " messages from " << *i << std::endl;
+
Sender sender = session.createSender(address);
if (i == queues.begin()) {
for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) {