summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-09-19 21:42:21 +0000
committerAlan Conway <aconway@apache.org>2012-09-19 21:42:21 +0000
commit653f560b36eb7070303eabc0aad0e0ab5fe47f8f (patch)
tree66c570fe1663016cda22ccb40da00a6ac28c8cb0 /cpp
parenta770b917bad25a996b2177f83ca9c38df7161edf (diff)
downloadqpid-python-653f560b36eb7070303eabc0aad0e0ab5fe47f8f.tar.gz
QPID-4322: HA sporadic failure in ha_tests
Added Queue::getRange to get range atomically, fixes races around getting the front and backup of the range as two separate operations. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1387785 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp10
-rw-r--r--cpp/src/qpid/broker/Queue.h17
-rw-r--r--cpp/src/qpid/ha/QueueRange.h11
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp19
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp24
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h11
-rwxr-xr-xcpp/src/tests/ha_tests.py2
8 files changed, 48 insertions, 48 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 6479e47799..afa5623ecd 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -230,7 +230,7 @@ void Link::startConnectionLH ()
setStateLH(STATE_CONNECTING);
broker->connect (host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
- QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
+ QPID_LOG (info, "Inter-broker link connecting to " << host << ":" << port);
} catch(const std::exception& e) {
QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
<< e.what());
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 9be3a1acac..276e17a8b5 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -1390,6 +1390,16 @@ SequenceNumber Queue::getPosition() {
return sequence;
}
+void Queue::getRange(framing::SequenceNumber& front, framing::SequenceNumber& back,
+ SubscriptionType type)
+{
+ Mutex::ScopedLock locker(messageLock);
+ QueueCursor cursor(type);
+ back = sequence;
+ Message* message = messages->next(cursor);
+ front = message ? message->getSequence() : back+1;
+}
+
int Queue::getEventMode() { return eventMode; }
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 671a24d53e..d52afec6b9 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -375,9 +375,24 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/**
*@return sequence number for the back of the queue. The next message pushed
- * will be at getPosition+1
+ * will be at getPosition()+1
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
+
+ /**
+ * Set front and back.
+ * If the queue is empty then front = back+1 (the first message to
+ * consume will be the next message pushed.)
+ *
+ *@param front = Position of first message to consume.
+ *@param back = getPosition(), next message pushed will be getPosition()+1
+ *@param type Subscription type to use to determine the front.
+ */
+ QPID_BROKER_EXTERN void getRange(
+ framing::SequenceNumber& front, framing::SequenceNumber& back,
+ SubscriptionType type=CONSUMER
+ );
+
QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
diff --git a/cpp/src/qpid/ha/QueueRange.h b/cpp/src/qpid/ha/QueueRange.h
index d734326910..f67ac146e6 100644
--- a/cpp/src/qpid/ha/QueueRange.h
+++ b/cpp/src/qpid/ha/QueueRange.h
@@ -24,6 +24,7 @@
#include "ReplicatingSubscription.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/SequenceNumber.h"
#include <iostream>
@@ -51,15 +52,7 @@ struct QueueRange {
QueueRange() : front(1), back(0) { } // Empty range.
- QueueRange(broker::Queue& q) {
- if (ReplicatingSubscription::getFront(q, front))
- back = q.getPosition();
- else {
- back = q.getPosition();
- front = back+1; // empty
- }
- assert(front <= back + 1);
- }
+ QueueRange(broker::Queue& q) { q.getRange(front, back, broker::REPLICATOR); }
QueueRange(const framing::FieldTable& args) {
back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 8baecb0e0f..82daad9d9c 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -119,9 +119,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
queue->getPosition());
settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
brokerInfo.asFieldTable());
- SequenceNumber front;
- if (ReplicatingSubscription::getFront(*queue, front))
- settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
+ SequenceNumber front, back;
+ queue->getRange(front, back, broker::REPLICATOR);
+ if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
QPID_LOG(debug, logPrefix << " subscribe with settings " << settings);
peer.getMessage().subscribe(
@@ -152,6 +152,17 @@ void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
queue->dequeueMessageAt(n);
}
+namespace {
+bool getSequence(const Message& message, SequenceNumber& result) {
+ result = message.getSequence();
+ return true;
+}
+bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) {
+ QueueCursor cursor(REPLICATOR);
+ return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1);
+}
+} // namespace
+
// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& msg)
{
@@ -176,7 +187,7 @@ void QueueReplicator::route(Deliverable& msg)
<< " to " << position);
// Verify that there are no messages after the new position in the queue.
SequenceNumber next;
- if (ReplicatingSubscription::getNext(*queue, position, next))
+ if (getNext(*queue, position, next))
throw Exception(QPID_MSG(logPrefix << "Invalid position " << position
<< " preceeds message at " << next));
queue->setPosition(position);
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 5fcb316ce6..ae6e7181d1 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -91,25 +91,6 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
-namespace {
-bool getSequence(const Message& message, SequenceNumber& result)
-{
- result = message.getSequence();
- return true;
-}
-}
-bool ReplicatingSubscription::getNext(
- broker::Queue& q, SequenceNumber position, SequenceNumber& result)
-{
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1);
-}
-
-bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
-}
-
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -171,6 +152,7 @@ ReplicatingSubscription::ReplicatingSubscription(
guard->attach(*this);
QueueRange backup(arguments); // Remote backup range.
+ QueueRange backupOriginal(backup);
QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
backupPosition = backup.back;
@@ -207,7 +189,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// queue and hasn't been tampered with then that will be the case.
QPID_LOG(debug, logPrefix << "Subscribed:"
- << " backup:" << backup
+ << " backup:" << backupOriginal << " adjusted backup:" << backup
<< " primary:" << primary
<< " catch-up: " << position << "-" << primary.back
<< "(" << primary.back-position << ")");
@@ -258,7 +240,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const
// backupPosition is latest position on backup queue before enqueueing
if (m.getSequence() <= backupPosition)
throw Exception(
- QPID_MSG("Expected position > " << backupPosition
+ QPID_MSG(logPrefix << "Expected position > " << backupPosition
<< " but got " << m.getSequence()));
if (m.getSequence() - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index 4f54ffce96..f714e8e01a 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -80,17 +80,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
static const std::string QPID_FRONT;
static const std::string QPID_BROKER_INFO;
- // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription
- /** Get position of front message on queue.
- *@return false if queue is empty.
- */
- static bool getFront(broker::Queue&, framing::SequenceNumber& result);
- /** Get next message after position in queue.
- *@return false if none found.
- */
- static bool getNext(broker::Queue&, framing::SequenceNumber position,
- framing::SequenceNumber& result);
-
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index e32077f219..64d87fbc86 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -886,7 +886,7 @@ class LongTests(BrokerTest):
"""Wait for receiver r to pass n"""
def check():
r.check() # Verify no exceptions
- return r.received > n
+ return r.received > n + 100
assert retry(check), "Stalled %s at %s"%(r.queue, n)
for r in receivers: wait_passed(r, 0)