diff options
author | Alan Conway <aconway@apache.org> | 2012-02-13 23:50:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-13 23:50:18 +0000 |
commit | 057a0635e081848ba59964c4a8e0923e521b7fe6 (patch) | |
tree | cdad9578140d5dbdb2e90525e2ba9cc870ca50b6 /qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | |
parent | cf61dbf9b313f9bd69b392eae8fd8d27d4e609a2 (diff) | |
download | qpid-python-057a0635e081848ba59964c4a8e0923e521b7fe6.tar.gz |
Merge branch 'qpid-3603-4-rebase' into qpid-3603-5qpid-3603-5
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-5@1243748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 292 |
1 files changed, 292 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp new file mode 100644 index 0000000000..e8571cf871 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -0,0 +1,292 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReplicatingSubscription.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/SessionContext.h" +#include "qpid/broker/ConnectionState.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include <sstream> + +namespace qpid { +namespace ha { + +using namespace framing; +using namespace broker; +using namespace std; + +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); + +namespace { +const string DOLLAR("$"); +const string INTERNAL("-internal"); +} // namespace + +string mask(const string& in) +{ + return DOLLAR + in + INTERNAL; +} + +/* Called by SemanticState::consume to create a consumer */ +boost::shared_ptr<broker::SemanticState::ConsumerImpl> +ReplicatingSubscription::Factory::create( + SemanticState* parent, + const string& name, + Queue::shared_ptr queue, + bool ack, + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments +) { + boost::shared_ptr<ReplicatingSubscription> rs; + if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { + rs.reset(new ReplicatingSubscription( + parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments)); + queue->addObserver(rs); + } + return rs; +} + +ReplicatingSubscription::ReplicatingSubscription( + SemanticState* parent, + const string& name, + Queue::shared_ptr queue, + bool ack, + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments +) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments), + events(new Queue(mask(name))), + consumer(new DelegatingConsumer(*this)) +{ + stringstream ss; + ss << "HA: Primary: " << getQueue()->getName() << " at " + << parent->getSession().getConnection().getUrl() << ": "; + logPrefix = ss.str(); + + // FIXME aconway 2011-12-09: Failover optimization removed. + // There was code here to re-use messages already on the backup + // during fail-over. This optimization was removed to simplify + // the logic till we get the basic replication stable, it + // can be re-introduced later. Last revision with the optimization: + // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. + + QPID_LOG(debug, logPrefix << "Created backup subscription " << getName()); + + // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 + // so we will start consuming from the lowest numbered message. + // This is incorrect if the sequence number wraps around, but + // this is what all consumers currently do. +} + +// Message is delivered in the subscription's connection thread. +bool ReplicatingSubscription::deliver(QueuedMessage& m) { + // Add position events for the subscribed queue, not for the internal event queue. + if (m.queue && m.queue == getQueue().get()) { + sys::Mutex::ScopedLock l(lock); + assert(position == m.position); + // m.position is the position of the newly enqueued m on the local queue. + // backupPosition is latest position on the backup queue (before enqueueing m.) + assert(m.position > backupPosition); + if (m.position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + SequenceNumber send(m.position); + --send; // Send the position before m was enqueued. + sendPositionEvent(send, l); + } + backupPosition = m.position; + QPID_LOG(trace, logPrefix << "Replicating message " << m.position); + } + return ConsumerImpl::deliver(m); +} + +ReplicatingSubscription::~ReplicatingSubscription() {} + + +// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg + +// Mark a message completed. May be called by acknowledge or dequeued +void ReplicatingSubscription::complete( + const QueuedMessage& qm, const sys::Mutex::ScopedLock&) +{ + // Handle completions for the subscribed queue, not the internal event queue. + if (qm.queue && qm.queue == getQueue().get()) { + QPID_LOG(trace, logPrefix << "Completed message " << qm.position); + Delayed::iterator i= delayed.find(qm.position); + // The same message can be completed twice, by acknowledged and + // dequeued, remove it from the set so it only gets completed + // once. + if (i != delayed.end()) { + assert(i->second.payload == qm.payload); + qm.payload->getIngressCompletion().finishCompleter(); + delayed.erase(i); + } + } +} + +// Called before we get notified of the message being available and +// under the message lock in the queue. Called in arbitrary connection thread. +void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { + sys::Mutex::ScopedLock l(lock); + // Delay completion + QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position); + qm.payload->getIngressCompletion().startCompleter(); + assert(delayed.find(qm.position) == delayed.end()); + delayed[qm.position] = qm; +} + + +// Function to complete a delayed message, called by cancel() +void ReplicatingSubscription::cancelComplete( + const Delayed::value_type& v, const sys::Mutex::ScopedLock&) +{ + QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position); + v.second.payload->getIngressCompletion().finishCompleter(); +} + +// Called in the subscription's connection thread. +void ReplicatingSubscription::cancel() +{ + getQueue()->removeObserver( + boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); + { + sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); + for_each(delayed.begin(), delayed.end(), + boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); + delayed.clear(); + } + ConsumerImpl::cancel(); +} + +// Called on primary in the backups IO thread. +void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { + sys::Mutex::ScopedLock l(lock); + // Finish completion of message, it has been acknowledged by the backup. + complete(msg, l); +} + +// Hide the "queue deleted" error for a ReplicatingSubscription when a +// queue is deleted, this is normal and not an error. +bool ReplicatingSubscription::hideDeletedError() { return true; } + +// Called with lock held. Called in subscription's connection thread. +void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) +{ + QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); + string buf(dequeues.encodedSize(),'\0'); + framing::Buffer buffer(&buf[0], buf.size()); + dequeues.encode(buffer); + dequeues.clear(); + buffer.reset(); + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); +} + +// Called after the message has been removed from the deque and under +// the messageLock in the queue. Called in arbitrary connection threads. +void ReplicatingSubscription::dequeued(const QueuedMessage& qm) +{ + { + sys::Mutex::ScopedLock l(lock); + QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position); + dequeues.add(qm.position); + // If we have not yet sent this message to the backup, then + // complete it now as it will never be accepted. + if (qm.position > position) complete(qm, l); + } + notify(); // Ensure a call to doDispatch +} + +// Called with lock held. Called in subscription's connection thread. +void ReplicatingSubscription::sendPositionEvent( + SequenceNumber position, const sys::Mutex::ScopedLock&l ) +{ + QPID_LOG(trace, logPrefix << "Sending position " << position + << ", was " << backupPosition); + string buf(backupPosition.encodedSize(),'\0'); + framing::Buffer buffer(&buf[0], buf.size()); + position.encode(buffer); + buffer.reset(); + sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); +} + +void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer, + const sys::Mutex::ScopedLock&) +{ + //generate event message + boost::intrusive_ptr<Message> event = new Message(); + AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize()); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + event->getFrames().append(method); + event->getFrames().append(header); + event->getFrames().append(content); + + DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); + props->setRoutingKey(key); + // Send the event using the events queue. Consumer is a + // DelegatingConsumer that delegates to *this for everything but + // has an independnet position. We put an event on events and + // dispatch it through ourselves to send it in line with the + // normal browsing messages. + events->deliver(event); + events->dispatch(consumer); +} + + +// Called in subscription's connection thread. +bool ReplicatingSubscription::doDispatch() +{ + { + sys::Mutex::ScopedLock l(lock); + if (!dequeues.empty()) sendDequeueEvent(l); + } + return ConsumerImpl::doDispatch(); +} + +ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} +ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); } +void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } +bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } +bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } +OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } + +}} // namespace qpid::ha |