summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp40
1 files changed, 24 insertions, 16 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index b43658365c..eda3f96180 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,8 +21,9 @@
#include "Event.h"
#include "HaBroker.h"
+#include "IdSetter.h"
#include "QueueReplicator.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "types.h"
@@ -122,6 +123,11 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
settings(hb.getSettings()),
nextId(0), maxId(0)
{
+ // The QueueReplicator will take over setting replication IDs.
+ boost::shared_ptr<IdSetter> setter =
+ q->getMessageInterceptors().findType<IdSetter>();
+ if (setter) q->getMessageInterceptors().remove(setter);
+
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -212,8 +218,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
- arguments.setString(ReplicatingSubscription::QPID_ID_SET,
- encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
+ boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
+ if (qs) arguments.setString(ReplicatingSubscription::QPID_ID_SET, encodeStr(qs->getSnapshot()));
+
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -254,6 +261,7 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
}
// Called in connection thread of the queues bridge to primary.
+
void QueueReplicator::route(Deliverable& deliverable)
{
try {
@@ -293,11 +301,6 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
-ReplicationId QueueReplicator::getMaxId() {
- Mutex::ScopedLock l(lock);
- return maxId;
-}
-
void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
@@ -320,14 +323,19 @@ bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
void QueueReplicator::promoted() {
- // Promoted to primary, deal with auto-delete now.
- if (queue && queue->isAutoDelete() && subscribed) {
- // Make a temporary shared_ptr to prevent premature deletion of queue.
- // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
- // which could delete the queue while it's still running it's destroyed logic.
- boost::shared_ptr<Queue> q(queue);
- q->releaseFromUse();
- q->scheduleAutoDelete();
+ if (queue) {
+ // On primary QueueReplicator no longer sets IDs, start an IdSetter.
+ queue->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
+ // Process auto-deletes
+ if (queue->isAutoDelete() && subscribed) {
+ // Make a temporary shared_ptr to prevent premature deletion of queue.
+ // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+ // which could delete the queue while it's still running it's destroyed logic.
+ boost::shared_ptr<Queue> q(queue);
+ q->releaseFromUse();
+ q->scheduleAutoDelete();
+ }
}
}