summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp57
1 files changed, 23 insertions, 34 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 6f7519cd1f..933716e8fa 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -91,25 +91,6 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
-namespace {
-bool getSequence(const Message& message, SequenceNumber& result)
-{
- result = message.getSequence();
- return true;
-}
-}
-bool ReplicatingSubscription::getNext(
- broker::Queue& q, SequenceNumber from, SequenceNumber& result)
-{
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from);
-}
-
-bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
-}
-
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -157,7 +138,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "Primary " << queue->getName() << "@" << info.getLogId() << ": ";
+ os << "Primary " << queue->getName() << "@" << info << ": ";
logPrefix = os.str();
// NOTE: Once the guard is attached we can have concurrent
@@ -171,6 +152,7 @@ ReplicatingSubscription::ReplicatingSubscription(
guard->attach(*this);
QueueRange backup(arguments); // Remote backup range.
+ QueueRange backupOriginal(backup);
QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
backupPosition = backup.back;
@@ -207,7 +189,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// queue and hasn't been tampered with then that will be the case.
QPID_LOG(debug, logPrefix << "Subscribed:"
- << " backup:" << backup
+ << " backup:" << backupOriginal << " adjusted backup:" << backup
<< " primary:" << primary
<< " catch-up: " << position << "-" << primary.back
<< "(" << primary.back-position << ")");
@@ -222,9 +204,7 @@ ReplicatingSubscription::ReplicatingSubscription(
}
}
-ReplicatingSubscription::~ReplicatingSubscription() {
- QPID_LOG(debug, logPrefix << "Detroyed replicating subscription");
-}
+ReplicatingSubscription::~ReplicatingSubscription() {}
// Called in subscription's connection thread when the subscription is created.
// Called separate from ctor because sending events requires
@@ -248,19 +228,20 @@ void ReplicatingSubscription::initialize() {
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
- position = m.getSequence();
+bool ReplicatingSubscription::deliver(
+ const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
+{
try {
- QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
{
Mutex::ScopedLock l(lock);
- //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+ position = m.getSequence();
- // m.getSequence() is the position of the newly enqueued message on local queue.
+ // m.getSequence() is the position of the new message on local queue.
// backupPosition is latest position on backup queue before enqueueing
if (m.getSequence() <= backupPosition)
throw Exception(
- QPID_MSG("Expected position > " << backupPosition
+ QPID_MSG(logPrefix << "Expected position > " << backupPosition
<< " but got " << m.getSequence()));
if (m.getSequence() - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.
@@ -272,7 +253,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const
}
return ConsumerImpl::deliver(c, m);
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
+ QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
<< ": " << e.what());
throw;
}
@@ -292,6 +273,7 @@ void ReplicatingSubscription::setReady() {
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
+ QPID_LOG(debug, logPrefix << "Cancelled");
guard->cancel();
ConsumerImpl::cancel();
}
@@ -299,7 +281,7 @@ void ReplicatingSubscription::cancel()
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+ QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
guard->complete(r.getMessageId());
// If next message is protected by the guard then we are ready
if (r.getMessageId() >= guard->getRange().back) setReady();
@@ -328,7 +310,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const Message& m)
{
- QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
{
Mutex::ScopedLock l(lock);
dequeues.add(m.getSequence());
@@ -396,7 +378,14 @@ bool ReplicatingSubscription::doDispatch()
Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
}
- return ConsumerImpl::doDispatch();
+ try {
+ return ConsumerImpl::doDispatch();
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2012-10-05: detect queue deletion, no warning.
+ QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
+ return false;
+ }
}
}} // namespace qpid::ha