diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 81 |
1 files changed, 36 insertions, 45 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 946831319c..4c3c209eab 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -19,8 +19,10 @@ * */ +#include "makeMessage.h" #include "HaBroker.h" #include "QueueReplicator.h" +#include "QueueSnapshots.h" #include "ReplicatingSubscription.h" #include "Settings.h" #include "qpid/broker/Bridge.h" @@ -31,10 +33,10 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/SessionHandler.h" -#include "qpid/framing/SequenceSet.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" #include "qpid/Msg.h" +#include "qpid/assert.h" #include <boost/shared_ptr.hpp> namespace { @@ -51,7 +53,7 @@ using namespace std; using sys::Mutex; const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue"); -const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA+"position"); +const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id"); const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); std::string QueueReplicator::replicatorName(const std::string& queueName) { @@ -107,10 +109,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), - logPrefix("Backup queue "+q->getName()+": "), + logPrefix("Backup of "+q->getName()+": "), queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false), - settings(hb.getSettings()), destroyed(false) + settings(hb.getSettings()), destroyed(false), + nextId(0), maxId(0) { + QPID_LOG(debug, logPrefix << "Created"); args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); @@ -162,13 +166,12 @@ QueueReplicator::~QueueReplicator() {} // Called from Queue::destroyed() void QueueReplicator::destroy() { - QPID_LOG(debug, logPrefix << " destroyed"); boost::shared_ptr<Bridge> bridge2; // To call outside of lock { Mutex::ScopedLock l(lock); if (destroyed) return; destroyed = true; - QPID_LOG(debug, logPrefix << "Destroyed."); + QPID_LOG(debug, logPrefix << "Destroyed"); // Need to drop shared pointers to avoid pointer cycles keeping this in memory. queue.reset(); link.reset(); @@ -188,12 +191,10 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable arguments; arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? - arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition()); - arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable()); - SequenceNumber front, back; - queue->getRange(front, back, broker::REPLICATOR); - if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front); + arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize? + arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable()); + arguments.setString(ReplicatingSubscription::QPID_ID_SET, + encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot())); try { peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, @@ -222,51 +223,36 @@ template <class T> T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) { - if (destroyed) return; - queue->dequeueMessageAt(n); -} - -namespace { -bool getSequence(const Message& message, SequenceNumber& result) { - result = message.getSequence(); - return true; -} -bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) { - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1); +void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Dequeue " << dequeues); + //TODO: should be able to optimise the following + for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) { + PositionMap::iterator j = positions.find(*i); + if (j != positions.end()) queue->dequeueMessageAt(j->second); + } } -} // namespace // Called in connection thread of the queues bridge to primary. void QueueReplicator::route(Deliverable& msg) { try { - const std::string& key = msg.getMessage().getRoutingKey(); Mutex::ScopedLock l(lock); if (destroyed) return; - if (!isEventKey(key)) { + const std::string& key = msg.getMessage().getRoutingKey(); + if (!isEventKey(key)) { // Replicated message + ReplicationId id = nextId++; + maxId = std::max(maxId, id); + msg.getMessage().setReplicationId(id); msg.deliverTo(queue); - // We are on a backup so the queue is not modified except via this. - QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + QueuePosition position = queue->getPosition(); + positions[id] = position; + QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id)); } else if (key == DEQUEUE_EVENT_KEY) { - SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); - //TODO: should be able to optimise the following - for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) - dequeue(*i, l); + dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l); } - else if (key == POSITION_EVENT_KEY) { - SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() - << " to " << position); - // Verify that there are no messages after the new position in the queue. - SequenceNumber next; - if (getNext(*queue, position, next)) - throw Exception(QPID_MSG(logPrefix << "Invalid position " << position - << " preceeds message at " << next)); - queue->setPosition(position); + else if (key == ID_EVENT_KEY) { + nextId = decodeContent<ReplicationId>(msg.getMessage()); } // Ignore unknown event keys, may be introduced in later versions. } @@ -275,6 +261,11 @@ void QueueReplicator::route(Deliverable& msg) } } +ReplicationId QueueReplicator::getMaxId() { + Mutex::ScopedLock l(lock); + return maxId; +} + // Unused Exchange methods. bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } |
