summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp81
1 files changed, 36 insertions, 45 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 946831319c..4c3c209eab 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,8 +19,10 @@
*
*/
+#include "makeMessage.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
+#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "qpid/broker/Bridge.h"
@@ -31,10 +33,10 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
-#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include "qpid/Msg.h"
+#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
namespace {
@@ -51,7 +53,7 @@ using namespace std;
using sys::Mutex;
const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
-const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA+"position");
+const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id");
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
@@ -107,10 +109,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
- logPrefix("Backup queue "+q->getName()+": "),
+ logPrefix("Backup of "+q->getName()+": "),
queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
- settings(hb.getSettings()), destroyed(false)
+ settings(hb.getSettings()), destroyed(false),
+ nextId(0), maxId(0)
{
+ QPID_LOG(debug, logPrefix << "Created");
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -162,13 +166,12 @@ QueueReplicator::~QueueReplicator() {}
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
- QPID_LOG(debug, logPrefix << " destroyed");
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
if (destroyed) return;
destroyed = true;
- QPID_LOG(debug, logPrefix << "Destroyed.");
+ QPID_LOG(debug, logPrefix << "Destroyed");
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
link.reset();
@@ -188,12 +191,10 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
- arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition());
- arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable());
- SequenceNumber front, back;
- queue->getRange(front, back, broker::REPLICATOR);
- if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front);
+ arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
+ arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
+ arguments.setString(ReplicatingSubscription::QPID_ID_SET,
+ encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -222,51 +223,36 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
- if (destroyed) return;
- queue->dequeueMessageAt(n);
-}
-
-namespace {
-bool getSequence(const Message& message, SequenceNumber& result) {
- result = message.getSequence();
- return true;
-}
-bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) {
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1);
+void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Dequeue " << dequeues);
+ //TODO: should be able to optimise the following
+ for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) {
+ PositionMap::iterator j = positions.find(*i);
+ if (j != positions.end()) queue->dequeueMessageAt(j->second);
+ }
}
-} // namespace
// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& msg)
{
try {
- const std::string& key = msg.getMessage().getRoutingKey();
Mutex::ScopedLock l(lock);
if (destroyed) return;
- if (!isEventKey(key)) {
+ const std::string& key = msg.getMessage().getRoutingKey();
+ if (!isEventKey(key)) { // Replicated message
+ ReplicationId id = nextId++;
+ maxId = std::max(maxId, id);
+ msg.getMessage().setReplicationId(id);
msg.deliverTo(queue);
- // We are on a backup so the queue is not modified except via this.
- QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ QueuePosition position = queue->getPosition();
+ positions[id] = position;
+ QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
}
else if (key == DEQUEUE_EVENT_KEY) {
- SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
- //TODO: should be able to optimise the following
- for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
- dequeue(*i, l);
+ dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l);
}
- else if (key == POSITION_EVENT_KEY) {
- SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
- << " to " << position);
- // Verify that there are no messages after the new position in the queue.
- SequenceNumber next;
- if (getNext(*queue, position, next))
- throw Exception(QPID_MSG(logPrefix << "Invalid position " << position
- << " preceeds message at " << next));
- queue->setPosition(position);
+ else if (key == ID_EVENT_KEY) {
+ nextId = decodeContent<ReplicationId>(msg.getMessage());
}
// Ignore unknown event keys, may be introduced in later versions.
}
@@ -275,6 +261,11 @@ void QueueReplicator::route(Deliverable& msg)
}
}
+ReplicationId QueueReplicator::getMaxId() {
+ Mutex::ScopedLock l(lock);
+ return maxId;
+}
+
// Unused Exchange methods.
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }