summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2015-06-25 10:22:51 +0000
committerRobert Gemmell <robbie@apache.org>2015-06-25 10:22:51 +0000
commit32ae758bc2e8fd962b66a4ab6341b14009f1907e (patch)
tree2f4d8174813284a6ea58bb6b7f6520aa92287476 /qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
parent116d91ad7825a98af36a869fc751206fbce0c59f (diff)
parentf7e896076143de4572b4f1f67ef0765125f2498d (diff)
downloadqpid-python-32ae758bc2e8fd962b66a4ab6341b14009f1907e.tar.gz
NO-JIRA: create branch for qpid-cpp 0.34 RC process
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-cpp-0.34-rc@1687469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp346
1 files changed, 346 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
new file mode 100644
index 0000000000..ca4dd0099f
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Event.h"
+#include "IdSetter.h"
+#include "QueueGuard.h"
+#include "QueueSnapshot.h"
+#include "ReplicatingSubscription.h"
+#include "TxReplicatingSubscription.h"
+#include "Primary.h"
+#include "HaBroker.h"
+#include "qpid/assert.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionContext.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
+#include <sstream>
+
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using namespace std;
+using sys::Mutex;
+using broker::amqp_0_10::MessageTransfer;
+namespace { const string QPID_HA(QPID_HA_PREFIX); }
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"repsub");
+const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info");
+const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids");
+const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep");
+const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep");
+
+/* Called by SemanticState::consume to create a consumer */
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) {
+ boost::shared_ptr<ReplicatingSubscription> rs;
+ std::string type = arguments.getAsString(QPID_REPLICATING_SUBSCRIPTION);
+ if (type == QPID_QUEUE_REPLICATOR) {
+ rs.reset(new ReplicatingSubscription(
+ haBroker,
+ parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
+ }
+ else if (type == QPID_TX_REPLICATOR) {
+ rs.reset(new TxReplicatingSubscription(
+ haBroker,
+ parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
+ }
+ if (rs) rs->initialize();
+ return rs;
+}
+
+ReplicatingSubscription::ReplicatingSubscription(
+ HaBroker& hb,
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue_,
+ bool ack,
+ bool /*acquire*/,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
+ resumeId, resumeTtl, arguments),
+ logPrefix(hb.logPrefix),
+ position(0), wasStopped(false), ready(false), cancelled(false),
+ haBroker(hb),
+ primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
+{}
+
+// Called in subscription's connection thread when the subscription is created.
+// Separate from ctor because we need to use shared_from_this
+//
+void ReplicatingSubscription::initialize() {
+ try {
+ FieldTable ft;
+ if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+ throw InvalidArgumentException(
+ logPrefix.get()+"Can't subscribe, no broker info: "+getTag());
+ info.assign(ft);
+
+ // Set a log prefix message that identifies the remote broker.
+ ostringstream os;
+ os << "Subscription to " << queue->getName() << " at ";
+ info.printId(os) << ": ";
+ logPrefix = os.str();
+
+ // If there's already a guard (we are in failover) use it, else create one.
+ if (primary) guard = primary->getGuard(queue, info);
+ if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix));
+
+ // NOTE: Once the observer is attached we can have concurrent
+ // calls to dequeued so we need to lock use of this->dequeues.
+ //
+ // However we must attach the observer _before_ we snapshot for
+ // initial dequeues to be sure we don't miss any dequeues
+ // between the snapshot and attaching the observer.
+ queue->getObservers().add(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+ boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>();
+ // There may be no snapshot if the queue is being deleted concurrently.
+ if (!snapshot) {
+ queue->getObservers().remove(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+ throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted");
+ }
+ ReplicationIdSet primaryIds = snapshot->getSnapshot();
+ std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
+ ReplicationIdSet backupIds;
+ if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
+
+ // Initial dequeues are messages on backup but not on primary.
+ ReplicationIdSet initDequeues = backupIds - primaryIds;
+ QueuePosition front,back;
+ queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
+ {
+ sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
+ dequeues += initDequeues; // Messages on backup that are not on primary.
+ skipEnqueue = backupIds - initDequeues; // Messages already on the backup.
+ // Queue front is moving but we know this subscriptions will start at a
+ // position >= front so if front is safe then position must be.
+ position = front;
+
+ QPID_LOG(debug, logPrefix << "Subscribed: primary ["
+ << front << "," << back << "]=" << primaryIds
+ << ", guarded " << guard->getFirst()
+ << ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")");
+ checkReady(l);
+ }
+
+ if (primary) primary->addReplica(*this);
+ Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
+ // Send initial dequeues to the backup.
+ // There must be a shared_ptr(this) when sending.
+ sendDequeueEvent(l);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Subscribe failed: " << e.what());
+ throw;
+ }
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+void ReplicatingSubscription::stopped() {
+ Mutex::ScopedLock l(lock);
+ // We have reached the last available message on the queue.
+ //
+ // Note that if messages have been removed out-of-order this may not be the
+ // head of the queue. We may not even have reached the guard
+ // position. However there are no more messages to protect and we will not
+ // be advanced any further, so we should consider ourselves guarded for
+ // purposes of readiness.
+ wasStopped = true;
+ checkReady(l);
+}
+
+// True if the next position for the ReplicatingSubscription is a guarded position.
+bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
+ // See comment in stopped()
+ return wasStopped || (position+1 >= guard->getFirst());
+}
+
+// Message is delivered in the subscription's connection thread.
+bool ReplicatingSubscription::deliver(
+ const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
+{
+ Mutex::ScopedLock l(lock);
+ ReplicationId id = m.getReplicationId();
+ position = m.getSequence();
+ try {
+ bool result = false;
+ if (skipEnqueue.contains(id)) {
+ QPID_LOG(trace, logPrefix << "Skip " << logMessageId(*getQueue(), m));
+ skipEnqueue -= id;
+ guard->complete(id); // This will never be acknowledged.
+ notify();
+ result = true;
+ }
+ else {
+ QPID_LOG(trace, logPrefix << "Replicated " << logMessageId(*getQueue(), m));
+ if (!ready && !isGuarded(l)) unready += id;
+ sendIdEvent(id, l);
+ result = ConsumerImpl::deliver(c, m);
+ }
+ checkReady(l);
+ return result;
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "Error replicating " << logMessageId(*getQueue(), m)
+ << ": " << e.what());
+ throw;
+ }
+}
+
+void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
+ if (!ready && isGuarded(l) && unready.empty()) {
+ ready = true;
+ sys::Mutex::ScopedUnlock u(lock);
+ // Notify Primary that a subscription is ready.
+ if (position+1 >= guard->getFirst()) {
+ QPID_LOG(debug, logPrefix << "Caught up at " << position);
+ } else {
+ QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst());
+ }
+
+ if (primary) primary->readyReplica(*this);
+ }
+}
+
+// Called in the subscription's connection thread.
+void ReplicatingSubscription::cancel()
+{
+ {
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ }
+ QPID_LOG(debug, logPrefix << "Cancelled");
+ if (primary) primary->removeReplica(*this);
+ getQueue()->getObservers().remove(
+ boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
+ guard->cancel();
+ ConsumerImpl::cancel();
+}
+
+// Consumer override, called on primary in the backup's IO thread.
+void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
+ // Finish completion of message, it has been acknowledged by the backup.
+ ReplicationId id = r.getReplicationId();
+ QPID_LOG(trace, logPrefix << "Acknowledged " <<
+ logMessageId(*getQueue(), r.getMessageId(), id));
+ guard->complete(id);
+ {
+ Mutex::ScopedLock l(lock);
+ unready -= id;
+ checkReady(l);
+ }
+ ConsumerImpl::acknowledged(r);
+}
+
+// Called with lock held. Called in subscription's connection thread.
+void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
+{
+ ReplicationIdSet oldDequeues = dequeues;
+ dequeues -= skipDequeue; // Don't send skipped dequeues
+ skipDequeue -= oldDequeues; // Forget dequeues that would have been sent.
+ if (dequeues.empty()) return;
+ QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
+ sendEvent(DequeueEvent(dequeues), l);
+}
+
+// Called after the message has been removed
+// from the deque and under the messageLock in the queue. Called in
+// arbitrary connection threads.
+void ReplicatingSubscription::dequeued(const broker::Message& m)
+{
+ ReplicationId id = m.getReplicationId();
+ QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
+ {
+ Mutex::ScopedLock l(lock);
+ dequeues.add(id);
+ }
+ notify(); // Ensure a call to doDispatch
+}
+
+
+// Called with lock held. Called in subscription's connection thread.
+void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
+{
+ sendEvent(IdEvent(pos), l);
+}
+
+void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&)
+{
+ Mutex::ScopedUnlock u(lock);
+ // Send the event directly to the base consumer implementation. The dummy
+ // consumer prevents acknowledgements being handled, which is what we want
+ // for events
+ ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr<Consumer>());
+}
+
+// Called in subscription's connection thread.
+bool ReplicatingSubscription::doDispatch()
+{
+ {
+ Mutex::ScopedLock l(lock);
+ if (!dequeues.empty()) sendDequeueEvent(l);
+ }
+ try {
+ return ConsumerImpl::doDispatch();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
+ return false;
+ }
+}
+
+void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ skipEnqueue += ids;
+}
+
+void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ skipDequeue += ids;
+}
+
+}} // namespace qpid::ha