diff options
author | Alan Conway <aconway@apache.org> | 2014-08-08 09:24:15 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-08-08 09:24:15 +0000 |
commit | 2602ecaf16a3ddf424383214da2ea846634c083f (patch) | |
tree | cbe7e6a423e2d521c2ebce63a479f2a4e3074ae9 | |
parent | a833f714a4de983bce8fb1c2f6b87070bd3b4309 (diff) | |
download | qpid-python-2602ecaf16a3ddf424383214da2ea846634c083f.tar.gz |
QPID-5966: HA mixing tx enqueue and non-tx dequeue leaves extra messages on backup.
There were several problems:
1. Positions of transactionally enqueued messages not known to QueueReplicator, so not dequeued
on backup if dequeued outside a TX on primary.
2. Race condition if tx created immediately after queue could cause duplication of TX message.
3. Replication IDs were not being set during recovery from store (regression, store change?)
Fix:
1. Update positions QueueReplicator positions via QueueObserver::enqueued to see all enqueues.
2. Check for duplicate replication-ids on backup in QueueReplicator::route.
3. Set replication-id in publish() if not already set in record().
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616704 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/IdSetter.h | 25 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 88 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/types.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/types.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 17 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 57 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 12 |
17 files changed, 228 insertions, 86 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 7e15ac1ad2..250acf6b4e 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -29,6 +29,7 @@ #include "qpid/management/Manageable.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" +#include "qpid/assert.h" #include <algorithm> #include <string.h> @@ -46,11 +47,11 @@ using std::string; namespace qpid { namespace broker { -Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0) +Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false) {} Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p) - : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0) + : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false) { if (persistentContext) persistentContext->setIngressCompletion(e); } @@ -297,9 +298,18 @@ void Message::processProperties(MapHandler& handler) const sharedState->processProperties(handler); } -uint64_t Message::getReplicationId() const { return replicationId; } +bool Message::hasReplicationId() const { + return isReplicationIdSet; +} + +uint64_t Message::getReplicationId() const { + return replicationId; +} -void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; } +void Message::setReplicationId(framing::SequenceNumber id) { + replicationId = id; + isReplicationIdSet = true; +} sys::AbsTime Message::getExpiration() const { diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 4d42b173c0..fe0427abd3 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -168,6 +168,7 @@ public: QPID_BROKER_EXTERN boost::intrusive_ptr<AsyncCompletion> getIngressCompletion() const; QPID_BROKER_EXTERN boost::intrusive_ptr<PersistableMessage> getPersistentContext() const; + QPID_BROKER_EXTERN bool hasReplicationId() const; QPID_BROKER_EXTERN uint64_t getReplicationId() const; QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id); @@ -214,6 +215,7 @@ public: MessageState state; qpid::framing::SequenceNumber sequence; framing::SequenceNumber replicationId; + bool isReplicationIdSet:1; void annotationsChanged(); bool getTtl(uint64_t&, uint64_t expiredValue) const; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 027448905b..f154e45a22 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -71,7 +71,7 @@ class HaBroker::BrokerObserver : public broker::BrokerObserver { public: void queueCreate(const boost::shared_ptr<broker::Queue>& q) { q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot)); - q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter)); + q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter(q->getName()))); } }; diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h index 67da62ef48..0350bf1519 100644 --- a/qpid/cpp/src/qpid/ha/IdSetter.h +++ b/qpid/cpp/src/qpid/ha/IdSetter.h @@ -43,10 +43,31 @@ namespace ha { class IdSetter : public broker::MessageInterceptor { public: - IdSetter(ReplicationId firstId=1) : nextId(firstId) {} - void record(broker::Message& m) { m.setReplicationId(nextId++); } + IdSetter(const std::string& q, ReplicationId firstId=1) : queue(q), nextId(firstId) { + QPID_LOG(debug, "Replication-ID will be set for " << queue << " from " << firstId); + } + + void record(broker::Message& m) { + // Record is called when a message is first delivered to a queue, before it has + // been enqueued or saved in a transaction buffer. This is when we normally want + // to assign a replication-id. + m.setReplicationId(nextId++); + QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m.getReplicationId())); + } + + void publish(broker::Message& m) { + // Publish is called when a message is assigned a position on the queue, + // after any transaction has comitted. Normally this is too late to + // assign a replication-id but during broker start-up and recovery from + // store record() is not called, so set the ID now if not already set. + if (!m.hasReplicationId()) { + m.setReplicationId(nextId++); + QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m)); + } + } private: + std::string queue; sys::AtomicValue<uint32_t> nextId; }; diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index be3dc25653..e7d77f9810 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -148,7 +148,7 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) { Mutex::ScopedLock l(lock); if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. - QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m)); + QPID_LOG(trace, logPrefix << "Enqueue: " << logMessageId(*q, m.getReplicationId())); checkState(SENDING, "Too late for enqueue"); empty = false; enqueues[q] += m.getReplicationId(); @@ -163,7 +163,7 @@ void PrimaryTxObserver::dequeue( Mutex::ScopedLock l(lock); checkState(SENDING, "Too late for dequeue"); if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. - QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id)); + QPID_LOG(trace, logPrefix << "Dequeue: " << logMessageId(*q, pos, id)); empty = false; dequeues[q] += id; txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 6ffd53ff21..94b7a53937 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -72,7 +72,7 @@ void QueueGuard::enqueued(const Message& m) { ReplicationId id = m.getReplicationId(); Mutex::ScopedLock l(lock); if (cancelled) return; // Don't record enqueues after we are cancelled. - QPID_LOG(trace, logPrefix << "Delayed completion of " << LogMessageId(queue, m)); + QPID_LOG(trace, logPrefix << "Delayed completion of " << logMessageId(queue, m)); delayed[id] = m.getIngressCompletion(); m.getIngressCompletion()->startCompleter(); } @@ -80,7 +80,7 @@ void QueueGuard::enqueued(const Message& m) { // NOTE: Called with message lock held. void QueueGuard::dequeued(const Message& m) { ReplicationId id = m.getReplicationId(); - QPID_LOG(trace, logPrefix << "Dequeued " << LogMessageId(queue, m)); + QPID_LOG(trace, logPrefix << "Dequeued " << logMessageId(queue, m)); Mutex::ScopedLock l(lock); complete(id, l); } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 4e908fbe79..ca06fabe86 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -106,22 +106,34 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { class QueueReplicator::QueueObserver : public broker::QueueObserver { public: - QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {} - void enqueued(const Message&) {} - void dequeued(const Message&) {} + typedef boost::shared_ptr<QueueReplicator> Ptr; + QueueObserver(Ptr qr) : queueReplicator(qr) {} + + void enqueued(const Message& m) { + Ptr qr = queueReplicator.lock(); + if (qr) qr->enqueued(m); + } + + void dequeued(const Message& m) { + Ptr qr = queueReplicator.lock(); + if (qr) qr->dequeued(m); + } + void acquired(const Message&) {} void requeued(const Message&) {} void consumerAdded( const Consumer& ) {} void consumerRemoved( const Consumer& ) {} // Queue observer is destroyed when the queue is. void destroy() { - boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock(); + Ptr qr = queueReplicator.lock(); if (qr) qr->destroy(); } + private: boost::weak_ptr<QueueReplicator> queueReplicator; }; + boost::shared_ptr<QueueReplicator> QueueReplicator::create( HaBroker& hb, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l) { @@ -278,48 +290,73 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) { QPID_LOG(trace, logPrefix << "Dequeue " << e.ids); //TODO: should be able to optimise the following for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) { - PositionMap::iterator j = positions.find(*i); - if (j != positions.end()) queue->dequeueMessageAt(j->second); + QueuePosition position; + { + Mutex::ScopedLock l(lock); + PositionMap::iterator j = positions.find(*i); + if (j == positions.end()) continue; + position = j->second; + } + queue->dequeueMessageAt(position); // Outside lock, will call dequeued(). + // positions will be cleaned up in dequeued() } } // Called in connection thread of the queues bridge to primary. - void QueueReplicator::route(Deliverable& deliverable) { try { - Mutex::ScopedLock l(lock); - if (!queue) return; // Already destroyed broker::Message& message(deliverable.getMessage()); - string key(message.getRoutingKey()); - if (!isEventKey(message.getRoutingKey())) { + { + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed + string key(message.getRoutingKey()); + if (isEventKey(key)) { + DispatchMap::iterator i = dispatch.find(key); + if (i == dispatch.end()) { + QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key); + } else { + (i->second)(message.getContent(), l); + } + return; + } ReplicationId id = nextId++; - maxId = std::max(maxId, id); message.setReplicationId(id); - deliver(message); - QueuePosition position = queue->getPosition(); - positions[id] = position; - QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id)); - } - else { - DispatchMap::iterator i = dispatch.find(key); - if (i == dispatch.end()) { - QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key); - } - else { - (i->second)(message.getContent(), l); + PositionMap::iterator i = positions.find(id); + if (i != positions.end()) { + QPID_LOG(trace, logPrefix << "Already on queue: " << logMessageId(*queue, message)); + return; } + QPID_LOG(trace, logPrefix << "Received: " << logMessageId(*queue, message)); } + deliver(message); // Outside lock, will call enqueued() } catch (const std::exception& e) { haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what())); } + } void QueueReplicator::deliver(const broker::Message& m) { queue->deliver(m); } +// Called via QueueObserver when message is enqueued. Could be as part of deliver() +// or in a different thread if a message is enqueued via a transaction. +// +void QueueReplicator::enqueued(const broker::Message& m) { + Mutex::ScopedLock l(lock); + maxId = std::max(maxId, ReplicationId(m.getReplicationId())); + positions[m.getReplicationId()] = m.getSequence(); + QPID_LOG(trace, logPrefix << "Enqueued " << logMessageId(*queue, m)); +} + +// Called via QueueObserver +void QueueReplicator::dequeued(const broker::Message& m) { + Mutex::ScopedLock l(lock); + positions.erase(m.getReplicationId()); +} + void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) { nextId = decodeStr<IdEvent>(data).id; } @@ -349,8 +386,9 @@ std::string QueueReplicator::getType() const { return ReplicatingSubscription::Q void QueueReplicator::promoted() { if (queue) { // On primary QueueReplicator no longer sets IDs, start an IdSetter. + QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1) queue->getMessageInterceptors().add( - boost::shared_ptr<IdSetter>(new IdSetter(maxId+1))); + boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), maxId+1))); // Process auto-deletes if (queue->isAutoDelete()) { // Make a temporary shared_ptr to prevent premature deletion of queue. diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 757f12c7a9..3d525db440 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -80,6 +80,10 @@ class QueueReplicator : public broker::Exchange, void route(broker::Deliverable&); + // Called via QueueObserver + void enqueued(const broker::Message&); + void dequeued(const broker::Message&); + // Set if the queue has ever been subscribed to, used for auto-delete cleanup. void setSubscribed() { subscribed = true; } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 908458fad3..67e1e77681 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -216,14 +216,14 @@ bool ReplicatingSubscription::deliver( try { bool result = false; if (skipEnqueue.contains(id)) { - QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m)); + QPID_LOG(trace, logPrefix << "Skip " << logMessageId(*getQueue(), m)); skipEnqueue -= id; guard->complete(id); // This will never be acknowledged. notify(); result = true; } else { - QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m)); + QPID_LOG(trace, logPrefix << "Replicated " << logMessageId(*getQueue(), m)); if (!ready && !isGuarded(l)) unready += id; sendIdEvent(id, l); result = ConsumerImpl::deliver(c, m); @@ -231,7 +231,7 @@ bool ReplicatingSubscription::deliver( checkReady(l); return result; } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m) + QPID_LOG(critical, logPrefix << "Error replicating " << logMessageId(*getQueue(), m) << ": " << e.what()); throw; } @@ -268,7 +268,7 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. ReplicationId id = r.getReplicationId(); QPID_LOG(trace, logPrefix << "Acknowledged " << - LogMessageId(*getQueue(), r.getMessageId(), id)); + logMessageId(*getQueue(), r.getMessageId(), id)); guard->complete(id); { Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 0e3f544d44..08c08b0ca3 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -137,8 +137,8 @@ class ReplicatingSubscription : BrokerInfo getBrokerInfo() const { return info; } -void skipEnqueues(const ReplicationIdSet& ids); -void skipDequeues(const ReplicationIdSet& ids); + void skipEnqueues(const ReplicationIdSet& ids); + void skipDequeues(const ReplicationIdSet& ids); protected: bool doDispatch(); diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index d2a647ae8f..ee8bd342b2 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -127,20 +127,19 @@ void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLoc } } -void TxReplicator::route(broker::Deliverable& deliverable) { - QueueReplicator::route(deliverable); -} - void TxReplicator::deliver(const broker::Message& m_) { - sys::Mutex::ScopedLock l(lock); - if (!txBuffer) return; - // Deliver message to the target queue, not the tx-queue. + boost::intrusive_ptr<broker::TxBuffer> txbuf; broker::Message m(m_); - m.setReplicationId(enq.id); // Use replicated id. - boost::shared_ptr<broker::Queue> queue = - haBroker.getBroker().getQueues().get(enq.queue); - QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m)); - DeliverableMessage dm(m, txBuffer.get()); + { + sys::Mutex::ScopedLock l(lock); + if (!txBuffer) return; + txbuf = txBuffer; + m.setReplicationId(enq.id); // Use enqueued replicated id. + } + // Deliver message to the target queue, not the tx-queue. + boost::shared_ptr<broker::Queue> queue = haBroker.getBroker().getQueues().get(enq.queue); + QPID_LOG(trace, logPrefix << "Deliver " << logMessageId(*queue, m.getReplicationId())); + DeliverableMessage dm(m, txbuf.get()); dm.deliverTo(queue); } diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index 5c509d14a7..fe25fbc78b 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -66,7 +66,6 @@ class TxReplicator : public QueueReplicator { std::string getType() const; // QueueReplicator overrides - void route(broker::Deliverable& deliverable); using QueueReplicator::destroy; void destroy(sys::Mutex::ScopedLock&); diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp index c02ae33470..00058a8a32 100644 --- a/qpid/cpp/src/qpid/ha/types.cpp +++ b/qpid/cpp/src/qpid/ha/types.cpp @@ -95,17 +95,24 @@ ostream& operator<<(ostream& o, const UuidSet& ids) { return o; } -LogMessageId::LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) : - queue(q.getName()), position(pos), replicationId(id) {} -LogMessageId::LogMessageId(const broker::Queue& q, const broker::Message& m) : - queue(q.getName()), position(m.getSequence()), replicationId(m.getReplicationId()) {} - -LogMessageId::LogMessageId(const std::string& q, const broker::Message& m) : - queue(q), position(m.getSequence()), replicationId(m.getReplicationId()) {} - -std::ostream& operator<<(std::ostream& o, const LogMessageId& m) { - return o << m.queue << "[" << m.position << "]=" << m.replicationId; +std::string logMessageId(const std::string& q, QueuePosition pos, ReplicationId id) { + return Msg() << q << "[" << pos << "]" << "=" << id; +} +std::string logMessageId(const std::string& q, ReplicationId id) { + return Msg() << q << "[]" << "=" << id; +} +std::string logMessageId(const std::string& q, const broker::Message& m) { + return logMessageId(q, m.getSequence(), m.getReplicationId()); +} +std::string logMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) { + return logMessageId(q.getName(), pos, id); +} +std::string logMessageId(const broker::Queue& q, ReplicationId id) { + return logMessageId(q.getName(), id); +} +std::string logMessageId(const broker::Queue& q, const broker::Message& m) { + return logMessageId(q.getName(), m); } void UuidSet::encode(framing::Buffer& b) const { diff --git a/qpid/cpp/src/qpid/ha/types.h b/qpid/cpp/src/qpid/ha/types.h index 92157d411b..ae4b948dfc 100644 --- a/qpid/cpp/src/qpid/ha/types.h +++ b/qpid/cpp/src/qpid/ha/types.h @@ -132,17 +132,13 @@ typedef framing::SequenceNumber ReplicationId; typedef framing::SequenceSet QueuePositionSet; typedef framing::SequenceSet ReplicationIdSet; -/** Helper for logging message ID */ -struct LogMessageId { - typedef boost::shared_ptr<broker::Queue> QueuePtr; - LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id); - LogMessageId(const broker::Queue& q, const broker::Message& m); - LogMessageId(const std::string& q, const broker::Message& m); - const std::string& queue; - QueuePosition position; - ReplicationId replicationId; -}; -std::ostream& operator<<(std::ostream&, const LogMessageId&); +/** Helpers for logging message ID */ +std::string logMessageId(const std::string& q, QueuePosition pos, ReplicationId id); +std::string logMessageId(const std::string& q, ReplicationId id); +std::string logMessageId(const std::string& q, const broker::Message& m); +std::string logMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id); +std::string logMessageId(const broker::Queue& q, ReplicationId id); +std::string logMessageId(const broker::Queue& q, const broker::Message& m); /** Return short version of human-readable UUID. */ inline std::string shortStr(const types::Uuid& uuid) { return uuid.str().substr(0,8); } diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 44824fe67e..461ef0de9a 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -428,17 +428,22 @@ class Broker(Popen): assert not error.search(line) or ignore.search(line), "Errors in log file %s: %s"%(log, line) finally: log.close() +def receiver_iter(receiver, timeout=0): + """Make an iterator out of a receiver. Returns messages till Empty is raised.""" + try: + while True: + yield receiver.fetch(timeout=timeout) + except qm.Empty: + pass + def browse(session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) r.capacity = 100 try: - contents = [] - try: - while True: contents.append(transform(r.fetch(timeout=timeout))) - except qm.Empty: pass - finally: r.close() - return contents + return [transform(m) for m in receiver_iter(r, timeout)] + finally: + r.close() def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"): """Assert that the contents of messages on queue (as retrieved diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f71560dffb..58b3ff2802 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1217,7 +1217,6 @@ class RecoveryTests(HaBrokerTest): def test_stalled_backup(self): """Make sure that a stalled backup broker does not stall the primary""" - # FIXME aconway 2014-04-15: merge with test_join_ready_cluster? cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"]) os.kill(cluster[1].pid, signal.SIGSTOP) s = cluster[0].connect().session() @@ -1272,7 +1271,7 @@ class StoreTests(HaBrokerTest): """Verify that a backup erases queue data from store recovery before doing catch-up from the primary.""" if self.check_skip(): return - cluster = HaCluster(self, 2) + cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store']) sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() s1 = sn.sender("q1;{create:always,node:{durable:true}}") for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True)) @@ -1362,9 +1361,61 @@ class TransactionTests(HaBrokerTest): tx.acknowledge() tx.commit() tx.sync() + tx.close() + + for b in cluster: + self.assert_simple_commit_outcome(b, tx_queues) + + # Verify non-tx dequeue is replicated correctly + c = cluster.connect(0, protocol=self.tx_protocol) + r = c.session().receiver("b") + ri = receiver_iter(r, timeout=1) + self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri]) + r.session.acknowledge() + for b in cluster: b.assert_browse_backup("b", [], msg=b) + + def check_enq_deq(self, cluster, queue, expect): + for b in cluster: + q = b.agent.getQueue(queue) + self.assertEqual( + (b.name,)+expect, + (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues)) + + def test_tx_enq_notx_deq(self): + """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" + cluster = HaCluster(self, 2, test_store=True) + c = cluster.connect(0, protocol=self.tx_protocol) + tx = c.session(transactional=True) + c.session().sender("qq;{create:always}").send("m1") + tx.sender("qq;{create:always}").send("tx") + tx.commit() tx.close() - for b in cluster: self.assert_simple_commit_outcome(b, tx_queues) + c.session().sender("qq;{create:always}").send("m2") + self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0)) + + notx = c.session() + self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))]) + notx.acknowledge() + self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0)) + for b in cluster: b.assert_browse_backup('qq', [], msg=b) + for b in cluster: self.assert_tx_clean(b) + + def test_tx_enq_notx_deq_qpid_send(self): + """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" + cluster = HaCluster(self, 2, test_store=True) + + self.popen( + ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1', + '--content-string=foo'] + ).assert_exit_ok() + for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b) + self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0)) + + self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok() + self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0)) + for b in cluster: b.assert_browse_backup('qq', [], msg=b) + for b in cluster: self.assert_tx_clean(b) def assert_tx_clean(self, b): """Verify that there are no transaction artifacts diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index a744d07a12..3d9941baee 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -205,7 +205,7 @@ struct Transfer : public TransactionalClient, public Runnable } session.commit(); t++; - if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; + if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; } catch (const TransactionAborted&) { std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl; session = connection.createTransactionalSession(); @@ -246,6 +246,16 @@ struct Controller : public Client for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE); + + // Clear out any garbage on queues. + Receiver receiver = session.createReceiver(address); + Message rmsg; + uint count(0); + while (receiver.fetch(rmsg, Duration::IMMEDIATE)) ++count; + session.acknowledge(); + receiver.close(); + if (!opts.quiet) std::cout << "Cleaned up " << count << " messages from " << *i << std::endl; + Sender sender = session.createSender(address); if (i == queues.begin()) { for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { |