summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp95
1 files changed, 47 insertions, 48 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index cdfe9dd888..d0b93da85f 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -25,15 +25,16 @@
#include "QueueReplicator.h"
#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
+#include "TxReplicatingSubscription.h"
#include "Primary.h"
#include "HaBroker.h"
#include "qpid/assert.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
#include <sstream>
@@ -47,22 +48,12 @@ using namespace broker;
using namespace std;
using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
-
-const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
-const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
-const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info");
-
-class ReplicatingSubscription::QueueObserver : public broker::QueueObserver {
- public:
- QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {}
- void enqueued(const broker::Message&) {}
- void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); }
- void acquired(const broker::Message&) {}
- void requeued(const broker::Message&) {}
- private:
- ReplicatingSubscription& rs;
-};
-
+namespace { const string QPID_HA(QPID_HA_PREFIX); }
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"repsub");
+const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info");
+const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids");
+const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep");
+const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep");
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -79,13 +70,20 @@ ReplicatingSubscription::Factory::create(
const framing::FieldTable& arguments
) {
boost::shared_ptr<ReplicatingSubscription> rs;
- if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
+ std::string type = arguments.getAsString(QPID_REPLICATING_SUBSCRIPTION);
+ if (type == QPID_QUEUE_REPLICATOR) {
rs.reset(new ReplicatingSubscription(
haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
- rs->initialize();
}
+ else if (type == QPID_TX_REPLICATOR) {
+ rs.reset(new TxReplicatingSubscription(
+ haBroker,
+ parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
+ }
+ if (rs) rs->initialize();
return rs;
}
@@ -100,7 +98,7 @@ ReplicatingSubscription::ReplicatingSubscription(
HaBroker& hb,
SemanticState* parent,
const string& name,
- Queue::shared_ptr queue,
+ Queue::shared_ptr queue_,
bool ack,
bool /*acquire*/,
bool exclusive,
@@ -108,16 +106,22 @@ ReplicatingSubscription::ReplicatingSubscription(
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
-) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
+) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
position(0), ready(false), cancelled(false),
haBroker(hb),
primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
-{
+{}
+
+// Called in subscription's connection thread when the subscription is created.
+// Separate from ctor because we need to use shared_from_this
+//
+void ReplicatingSubscription::initialize() {
try {
FieldTable ft;
- if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
- throw Exception("Replicating subscription does not have broker info: " + tag);
+ if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+ throw InvalidArgumentException(
+ logPrefix+"Can't subscribe, no broker info: "+getTag());
info.assign(ft);
// Set a log prefix message that identifies the remote broker.
@@ -147,10 +151,17 @@ ReplicatingSubscription::ReplicatingSubscription(
// However we must attach the observer _before_ we snapshot for
// initial dequeues to be sure we don't miss any dequeues
// between the snapshot and attaching the observer.
- observer.reset(new QueueObserver(*this));
- queue->addObserver(observer);
- ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot();
- std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
+ queue->addObserver(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+ boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+ // There may be no snapshot if the queue is being deleted concurrently.
+ if (!snapshot) {
+ queue->removeObserver(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+ throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
+ }
+ ReplicationIdSet primaryIds = snapshot->snapshot();
+ std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
ReplicationIdSet backupIds;
if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
@@ -172,23 +183,7 @@ ReplicatingSubscription::ReplicatingSubscription(
<< ", on backup " << skip);
checkReady(l);
}
- }
- catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Creation error: " << e.what()
- << ": arguments=" << getArguments());
- throw;
- }
-}
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
-
-// Called in subscription's connection thread when the subscription is created.
-// Called separate from ctor because sending events requires
-// shared_from_this
-//
-void ReplicatingSubscription::initialize() {
- try {
if (primary) primary->addReplica(*this);
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues to the backup.
@@ -196,12 +191,14 @@ void ReplicatingSubscription::initialize() {
sendDequeueEvent(l);
}
catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
- << ": arguments=" << getArguments());
+ QPID_LOG(error, logPrefix << "Subscribe failed: " << e.what());
throw;
}
}
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+
// True if the next position for the ReplicatingSubscription is a guarded position.
bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
return position+1 >= guard->getFirst();
@@ -258,7 +255,8 @@ void ReplicatingSubscription::cancel()
}
QPID_LOG(debug, logPrefix << "Cancelled");
if (primary) primary->removeReplica(*this);
- getQueue()->removeObserver(observer);
+ getQueue()->removeObserver(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
guard->cancel();
ConsumerImpl::cancel();
}
@@ -289,8 +287,9 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
-void ReplicatingSubscription::dequeued(ReplicationId id)
+void ReplicatingSubscription::dequeued(const broker::Message& m)
{
+ ReplicationId id = m.getReplicationId();
QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
{
Mutex::ScopedLock l(lock);