diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 81 |
1 files changed, 49 insertions, 32 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index d99602fdda..28e9dc4120 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -19,12 +19,13 @@ * */ -#include "makeMessage.h" +#include "Event.h" #include "HaBroker.h" #include "QueueReplicator.h" #include "QueueSnapshots.h" #include "ReplicatingSubscription.h" #include "Settings.h" +#include "types.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -38,36 +39,32 @@ #include "qpid/Msg.h" #include "qpid/assert.h" #include <boost/shared_ptr.hpp> +#include <boost/bind.hpp> -namespace { -const std::string QPID_REPLICATOR_("qpid.replicator-"); -const std::string TYPE_NAME("qpid.queue-replicator"); -const std::string QPID_HA("qpid.ha-"); -} namespace qpid { namespace ha { using namespace broker; using namespace framing; using namespace std; +using namespace boost; +using std::exception; using sys::Mutex; -const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue"); -const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id"); const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -std::string QueueReplicator::replicatorName(const std::string& queueName) { - return QPID_REPLICATOR_ + queueName; +namespace { +const string QPID_HA(QPID_HA_PREFIX); +const std::string TYPE_NAME(QPID_HA+"queue-replicator"); } -bool QueueReplicator::isReplicatorName(const std::string& name) { - return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0; + +std::string QueueReplicator::replicatorName(const std::string& queueName) { + return QUEUE_REPLICATOR_PREFIX + queueName; } -bool QueueReplicator::isEventKey(const std::string key) { - const std::string& prefix = QPID_HA; - bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; - return ret; +bool QueueReplicator::isReplicatorName(const std::string& name) { + return startsWith(name, QUEUE_REPLICATOR_PREFIX); } class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { @@ -109,12 +106,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), + brokerInfo(hb.getBrokerInfo()), logPrefix("Backup of "+q->getName()+": "), - queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false), + queue(q), link(l), subscribed(false), settings(hb.getSettings()), destroyed(false), nextId(0), maxId(0) { - QPID_LOG(debug, logPrefix << "Created"); args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); @@ -122,12 +119,18 @@ QueueReplicator::QueueReplicator(HaBroker& hb, args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); if (q->isAutoDelete()) q->markInUse(); + + dispatch[DequeueEvent::KEY] = + boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2); + dispatch[IdEvent::KEY] = + boost::bind(&QueueReplicator::idEvent, this, _1, _2); } // This must be called immediately after the constructor. // It has to be separate so we can call shared_from_this(). void QueueReplicator::activate() { Mutex::ScopedLock l(lock); + QPID_LOG(debug, logPrefix << "Created"); if (!queue) return; // Already destroyed // Enable callback to route() @@ -224,44 +227,57 @@ template <class T> T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "Dequeue " << dequeues); +void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) { + DequeueEvent e; + decodeStr(data, e); + QPID_LOG(trace, logPrefix << "Dequeue " << e.ids); //TODO: should be able to optimise the following - for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) { + for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) { PositionMap::iterator j = positions.find(*i); if (j != positions.end()) queue->dequeueMessageAt(j->second); } } // Called in connection thread of the queues bridge to primary. -void QueueReplicator::route(Deliverable& msg) +void QueueReplicator::route(Deliverable& deliverable) { try { Mutex::ScopedLock l(lock); if (destroyed) return; - const std::string& key = msg.getMessage().getRoutingKey(); - if (!isEventKey(key)) { // Replicated message + broker::Message& message(deliverable.getMessage()); + string key(message.getRoutingKey()); + if (!isEventKey(message.getRoutingKey())) { ReplicationId id = nextId++; maxId = std::max(maxId, id); - msg.getMessage().setReplicationId(id); - msg.deliverTo(queue); + message.setReplicationId(id); + deliver(message); QueuePosition position = queue->getPosition(); positions[id] = position; QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id)); } - else if (key == DEQUEUE_EVENT_KEY) { - dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l); + else { + DispatchMap::iterator i = dispatch.find(key); + if (i == dispatch.end()) { + QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key); + } + else { + (i->second)(message.getContent(), l); + } } - else if (key == ID_EVENT_KEY) { - nextId = decodeContent<ReplicationId>(msg.getMessage()); - } - // Ignore unknown event keys, may be introduced in later versions. } catch (const std::exception& e) { haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what())); } } +void QueueReplicator::deliver(const broker::Message& m) { + queue->deliver(m); +} + +void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) { + nextId = decodeStr<IdEvent>(data).id; +} + ReplicationId QueueReplicator::getMaxId() { Mutex::ScopedLock l(lock); return maxId; @@ -273,4 +289,5 @@ bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } std::string QueueReplicator::getType() const { return TYPE_NAME; } + }} // namespace qpid::broker |