summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp11
1 files changed, 5 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index be910a087f..ae53f89404 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -120,8 +120,10 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
brokerInfo.asFieldTable());
SequenceNumber front;
- if (ReplicatingSubscription::getFront(*queue, front))
+ if (ReplicatingSubscription::getFront(*queue, front)) {
settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
+ QPID_LOG(debug, "QPID_FRONT for " << queue->getName() << " is " << front);
+ }
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
false/*exclusive*/, "", 0, settings);
@@ -137,8 +139,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
namespace {
template <class T> T decodeContent(Message& m) {
- std::string content;
- m.getFrames().getContent(content);
+ std::string content = m.getContent();
Buffer buffer(const_cast<char*>(content.c_str()), content.size());
T result;
result.decode(buffer);
@@ -148,9 +149,7 @@ template <class T> T decodeContent(Message& m) {
void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
- QueuedMessage message;
- if (queue->acquireMessageAt(n, message))
- queue->dequeue(0, message);
+ queue->dequeueMessageAt(n);
}
// Called in connection thread of the queues bridge to primary.