summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp71
1 files changed, 44 insertions, 27 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index af6180305d..91a4538bc4 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -87,10 +87,13 @@ ReplicatingSubscription::ReplicatingSubscription(
events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
+ // Separate the remote part from a "local-remote" address.
+ string address = parent->getSession().getConnection().getUrl();
+ size_t i = address.find('-');
+ if (i != string::npos) address = address.substr(i+1);
+ logPrefix = "HA: Primary ";
stringstream ss;
- ss << "HA: Primary: " << getQueue()->getName() << " at "
- << parent->getSession().getConnection().getUrl() << ": ";
- logPrefix = ss.str();
+ logSuffix = " (" + address + ")";
// FIXME aconway 2011-12-09: Failover optimization removed.
// There was code here to re-use messages already on the backup
@@ -99,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// can be re-introduced later. Last revision with the optimization:
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
- QPID_LOG(debug, logPrefix << "Created backup subscription " << getName());
+ QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
// FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
// so we will start consuming from the lowest numbered message.
@@ -109,23 +112,36 @@ ReplicatingSubscription::ReplicatingSubscription(
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(QueuedMessage& m) {
- // Add position events for the subscribed queue, not for the internal event queue.
- if (m.queue && m.queue == getQueue().get()) {
- sys::Mutex::ScopedLock l(lock);
- assert(position == m.position);
- // m.position is the position of the newly enqueued m on the local queue.
- // backupPosition is latest position on the backup queue (before enqueueing m.)
- assert(m.position > backupPosition);
- if (m.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(m.position);
- --send; // Send the position before m was enqueued.
- sendPositionEvent(send, l);
+ try {
+ // Add position events for the subscribed queue, not for the internal event queue.
+ if (m.queue && m.queue == getQueue().get()) {
+ sys::Mutex::ScopedLock l(lock);
+ if (position != m.position)
+ throw Exception(
+ QPID_MSG("Expected position " << position
+ << " but got " << m.position));
+ // m.position is the position of the newly enqueued m on the local queue.
+ // backupPosition is latest position on the backup queue (before enqueueing m.)
+ if (m.position <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << m.position));
+
+ if (m.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(m.position);
+ --send; // Send the position before m was enqueued.
+ sendPositionEvent(send, l);
+ }
+ backupPosition = m.position;
+ QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
}
- backupPosition = m.position;
- QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
+ return ConsumerImpl::deliver(m);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
+ << logSuffix << ": " << e.what());
+ throw;
}
- return ConsumerImpl::deliver(m);
}
ReplicatingSubscription::~ReplicatingSubscription() {}
@@ -139,7 +155,7 @@ void ReplicatingSubscription::complete(
{
// Handle completions for the subscribed queue, not the internal event queue.
if (qm.queue && qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+ QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
Delayed::iterator i= delayed.find(qm.position);
// The same message can be completed twice, by acknowledged and
// dequeued, remove it from the set so it only gets completed
@@ -157,7 +173,7 @@ void ReplicatingSubscription::complete(
void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
sys::Mutex::ScopedLock l(lock);
// Delay completion
- QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+ QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
qm.payload->getIngressCompletion().startCompleter();
assert(delayed.find(qm.position) == delayed.end());
delayed[qm.position] = qm;
@@ -168,7 +184,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
void ReplicatingSubscription::cancelComplete(
const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+ QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
v.second.payload->getIngressCompletion().finishCompleter();
}
@@ -179,7 +195,7 @@ void ReplicatingSubscription::cancel()
boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+ QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
for_each(delayed.begin(), delayed.end(),
boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
delayed.clear();
@@ -201,7 +217,8 @@ bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
{
- QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
+ QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
+ << " from " << getQueue()->getName() << logSuffix);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
@@ -216,7 +233,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+ QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
dequeues.add(qm.position);
// If we have not yet sent this message to the backup, then
// complete it now as it will never be accepted.
@@ -229,8 +246,8 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
{
- QPID_LOG(trace, logPrefix << "Sending position " << position
- << ", was " << backupPosition);
+ QPID_LOG(trace, logPrefix << "sending position " << position
+ << ", was " << backupPosition << logSuffix);
string buf(backupPosition.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
position.encode(buffer);