From 1093062db03077998823cbefb9ca9645076694ea Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 17 Jun 2013 14:19:10 +0000 Subject: QPID-4348: HA Use independent sequence numbers for identifying messages Previously HA code used queue sequence numbers to identify messasges. This assumes that message sequence is identical on primary and backup. Implementing new features (for example transactions) requires that we tolerate ordering differences between primary and backups. This patch introduces a new, queue-scoped HA sequence number managed by the HA plugin. The HA ID is set *before* the message is enqueued and assigned a queue sequence number. This means it is possible to identify messages before they are enqueued, e.g. messages in an open transaction. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1493771 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Queue.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'cpp/src/qpid/broker/Queue.cpp') diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e068ce6fe4..3b68d4117b 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -285,9 +285,9 @@ void Queue::deliverTo(Message msg, TxBuffer* txn) } else { if (enqueue(0, msg)) { push(msg); - QPID_LOG(debug, "Message " << msg << " enqueued on " << name); + QPID_LOG(debug, "Message " << msg.getSequence() << " enqueued on " << name); } else { - QPID_LOG(debug, "Message " << msg << " dropped from " << name); + QPID_LOG(debug, "Message " << msg.getSequence() << " dropped from " << name); } } } @@ -415,7 +415,8 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) if (c->filter(*msg)) { if (c->accept(*msg)) { if (c->preAcquires()) { - QPID_LOG(debug, "Attempting to acquire message " << msg << " from '" << name << "' with state " << msg->getState()); + QPID_LOG(debug, "Attempting to acquire message " << msg->getSequence() + << " from '" << name << "' with state " << msg->getState()); if (allocator->acquire(c->getName(), *msg)) { if (mgmtObject) { mgmtObject->inc_acquires(); @@ -825,6 +826,7 @@ void Queue::setLastNodeFailure() */ bool Queue::enqueue(TransactionContext* ctxt, Message& msg) { + interceptors.record(msg); ScopedUse u(barrier); if (!u.acquired) return false; -- cgit v1.2.1