summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp81
1 files changed, 49 insertions, 32 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index d99602fdda..28e9dc4120 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,12 +19,13 @@
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "types.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -38,36 +39,32 @@
#include "qpid/Msg.h"
#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
-namespace {
-const std::string QPID_REPLICATOR_("qpid.replicator-");
-const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_HA("qpid.ha-");
-}
namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
using namespace std;
+using namespace boost;
+using std::exception;
using sys::Mutex;
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
-const std::string QueueReplicator::ID_EVENT_KEY(QPID_HA+"id");
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-std::string QueueReplicator::replicatorName(const std::string& queueName) {
- return QPID_REPLICATOR_ + queueName;
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+const std::string TYPE_NAME(QPID_HA+"queue-replicator");
}
-bool QueueReplicator::isReplicatorName(const std::string& name) {
- return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+ return QUEUE_REPLICATOR_PREFIX + queueName;
}
-bool QueueReplicator::isEventKey(const std::string key) {
- const std::string& prefix = QPID_HA;
- bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
- return ret;
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+ return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
@@ -109,12 +106,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
+ brokerInfo(hb.getBrokerInfo()),
logPrefix("Backup of "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
+ queue(q), link(l), subscribed(false),
settings(hb.getSettings()), destroyed(false),
nextId(0), maxId(0)
{
- QPID_LOG(debug, logPrefix << "Created");
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -122,12 +119,18 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
if (q->isAutoDelete()) q->markInUse();
+
+ dispatch[DequeueEvent::KEY] =
+ boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
+ dispatch[IdEvent::KEY] =
+ boost::bind(&QueueReplicator::idEvent, this, _1, _2);
}
// This must be called immediately after the constructor.
// It has to be separate so we can call shared_from_this().
void QueueReplicator::activate() {
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix << "Created");
if (!queue) return; // Already destroyed
// Enable callback to route()
@@ -224,44 +227,57 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(const ReplicationIdSet& dequeues, Mutex::ScopedLock&) {
- QPID_LOG(trace, logPrefix << "Dequeue " << dequeues);
+void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
+ DequeueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
//TODO: should be able to optimise the following
- for (ReplicationIdSet::iterator i = dequeues.begin(); i != dequeues.end(); ++i) {
+ for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
PositionMap::iterator j = positions.find(*i);
if (j != positions.end()) queue->dequeueMessageAt(j->second);
}
}
// Called in connection thread of the queues bridge to primary.
-void QueueReplicator::route(Deliverable& msg)
+void QueueReplicator::route(Deliverable& deliverable)
{
try {
Mutex::ScopedLock l(lock);
if (destroyed) return;
- const std::string& key = msg.getMessage().getRoutingKey();
- if (!isEventKey(key)) { // Replicated message
+ broker::Message& message(deliverable.getMessage());
+ string key(message.getRoutingKey());
+ if (!isEventKey(message.getRoutingKey())) {
ReplicationId id = nextId++;
maxId = std::max(maxId, id);
- msg.getMessage().setReplicationId(id);
- msg.deliverTo(queue);
+ message.setReplicationId(id);
+ deliver(message);
QueuePosition position = queue->getPosition();
positions[id] = position;
QPID_LOG(trace, logPrefix << "Enqueued " << LogMessageId(*queue,position,id));
}
- else if (key == DEQUEUE_EVENT_KEY) {
- dequeue(decodeContent<ReplicationIdSet>(msg.getMessage()), l);
+ else {
+ DispatchMap::iterator i = dispatch.find(key);
+ if (i == dispatch.end()) {
+ QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
+ }
+ else {
+ (i->second)(message.getContent(), l);
+ }
}
- else if (key == ID_EVENT_KEY) {
- nextId = decodeContent<ReplicationId>(msg.getMessage());
- }
- // Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
}
}
+void QueueReplicator::deliver(const broker::Message& m) {
+ queue->deliver(m);
+}
+
+void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
+ nextId = decodeStr<IdEvent>(data).id;
+}
+
ReplicationId QueueReplicator::getMaxId() {
Mutex::ScopedLock l(lock);
return maxId;
@@ -273,4 +289,5 @@ bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
}} // namespace qpid::broker