summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp17
1 files changed, 11 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 47fc3afdeb..58c5e452d7 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,6 +21,7 @@
#include "Counter.h"
#include "QueueReplicator.h"
+#include "HaBroker.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
@@ -59,14 +60,15 @@ bool QueueReplicator::isEventKey(const std::string key) {
return ret;
}
-QueueReplicator::QueueReplicator(const LogPrefix& lp,
+QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
- logPrefix(lp), queue(q), link(l)
+ haBroker(hb), logPrefix(hb), queue(q), link(l)
{
- framing::Uuid uuid(true);
+ Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
+ logPrefix.setMessage(q->getName());
QPID_LOG(info, logPrefix << "Created");
}
@@ -109,12 +111,15 @@ void QueueReplicator::deactivate() {
// Called in a broker connection thread when the bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
sys::Mutex::ScopedLock l(lock);
- framing::AMQP_ServerProxy peer(sessionHandler.out);
+ AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
- framing::FieldTable settings;
+ FieldTable settings;
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
- settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+ settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER,
+ queue->getPosition());
+ settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
+ haBroker.getBrokerInfo().asFieldTable());
SequenceNumber front;
if (ReplicatingSubscription::getFront(*queue, front))
settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front);