summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp42
1 files changed, 17 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 2001ec5332..9f464f8066 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "makeMessage.h"
+#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
#include "QueueReplicator.h"
@@ -36,6 +36,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
+#include <boost/pointer_cast.hpp>
#include <sstream>
@@ -45,6 +46,7 @@ namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
+using namespace boost;
using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
@@ -111,7 +113,8 @@ ReplicatingSubscription::ReplicatingSubscription(
) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
position(0), ready(false), cancelled(false),
- haBroker(hb)
+ haBroker(hb),
+ primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
{
try {
FieldTable ft;
@@ -137,8 +140,6 @@ ReplicatingSubscription::ReplicatingSubscription(
}
// If there's already a guard (we are in failover) use it, else create one.
- boost::shared_ptr<Primary> primary =
- boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -163,7 +164,6 @@ ReplicatingSubscription::ReplicatingSubscription(
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
skip = backupIds - initDequeues; // Messages already on the backup.
-
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
position = front;
@@ -191,6 +191,7 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
//
void ReplicatingSubscription::initialize() {
try {
+ if (primary) primary->addReplica(*this);
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues to the backup.
// There must be a shared_ptr(this) when sending.
@@ -218,9 +219,8 @@ bool ReplicatingSubscription::deliver(
try {
bool result = false;
if (skip.contains(id)) {
+ QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
skip -= id;
- QPID_LOG(trace, logPrefix << "On backup, skip " <<
- LogMessageId(*getQueue(), m));
guard->complete(id); // This will never be acknowledged.
notify();
result = true;
@@ -240,17 +240,12 @@ bool ReplicatingSubscription::deliver(
}
}
-/**
- *@param position: must be <= last position seen by subscription.
- */
void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
if (!ready && isGuarded(l) && unready.empty()) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
QPID_LOG(debug, logPrefix << "Caught up");
- boost::shared_ptr<Primary> primary =
- boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
if (primary) primary->readyReplica(*this);
}
}
@@ -264,6 +259,7 @@ void ReplicatingSubscription::cancel()
cancelled = true;
}
QPID_LOG(debug, logPrefix << "Cancelled");
+ if (primary) primary->removeReplica(*this);
getQueue()->removeObserver(observer);
guard->cancel();
ConsumerImpl::cancel();
@@ -289,9 +285,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
- string buffer = encodeStr(dequeues);
- dequeues.clear();
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+ sendEvent(DequeueEvent(dequeues), l);
}
// Called after the message has been removed
@@ -311,23 +305,16 @@ void ReplicatingSubscription::dequeued(ReplicationId id)
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
{
- sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
+ sendEvent(IdEvent(pos), l);
}
-void ReplicatingSubscription::sendEvent(const std::string& key,
- const std::string& buffer,
- Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&)
{
Mutex::ScopedUnlock u(lock);
- broker::Message message = makeMessage(buffer);
- MessageTransfer& transfer = MessageTransfer::get(message);
- DeliveryProperties* props =
- transfer.getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey(key);
// Send the event directly to the base consumer implementation. The dummy
// consumer prevents acknowledgements being handled, which is what we want
// for events
- ConsumerImpl::deliver(QueueCursor(), message, boost::shared_ptr<Consumer>());
+ ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr<Consumer>());
}
// Called in subscription's connection thread.
@@ -346,4 +333,9 @@ bool ReplicatingSubscription::doDispatch()
}
}
+void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ skip += ids;
+}
+
}} // namespace qpid::ha