diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp new file mode 100644 index 0000000000..0017cc82cd --- /dev/null +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueReplicator.h" +#include "ReplicatingSubscription.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> +#include <sstream> + +namespace { +const std::string QPID_REPLICATOR_("qpid.replicator-"); +const std::string TYPE_NAME("qpid.queue-replicator"); +const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); +} + +namespace qpid { +namespace ha { +using namespace broker; +using namespace framing; + +const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); +const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event"); + +std::string QueueReplicator::replicatorName(const std::string& queueName) { + return QPID_REPLICATOR_ + queueName; +} + +QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) + : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) +{ + std::stringstream ss; + ss << "HA: Backup " << queue->getName() << ": "; + logPrefix = ss.str(); + QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); +} + +// This must be separate from the constructor so we can call shared_from_this. +void QueueReplicator::activate() { + // Note this may create a new bridge or use an existing one. + queue->getBroker()->getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + queue->getName(), // src + getName(), // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + // Include shared_ptr to self to ensure we are not deleted + // before initializeBridge is called. + boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this()) + ); +} + +QueueReplicator::~QueueReplicator() {} + +void QueueReplicator::deactivate() { + sys::Mutex::ScopedLock l(lock); + queue->getBroker()->getLinks().destroy( + link->getHost(), link->getPort(), queue->getName(), getName(), string()); + QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); +} + +// Called in a broker connection thread when the bridge is created. +// shared_ptr to self ensures we are not deleted before initializeBridge is called. +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler, + boost::shared_ptr<QueueReplicator> /*self*/) { + sys::Mutex::ScopedLock l(lock); + bridgeName = bridge.getName(); + framing::AMQP_ServerProxy peer(sessionHandler.out); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + framing::FieldTable settings; + + // FIXME aconway 2011-12-09: Failover optimization removed. + // There was code here to re-use messages already on the backup + // during fail-over. This optimization was removed to simplify + // the logic till we get the basic replication stable, it + // can be re-introduced later. Last revision with the optimization: + // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. + + // Clear out any old messages, reset the queue to start replicating fresh. + queue->purge(); + queue->setPosition(0); + + settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + // TODO aconway 2011-12-19: optimize. + settings.setInt(QPID_SYNC_FREQUENCY, 1); + peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings); + peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); + peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); + QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName); +} + +namespace { +template <class T> T decodeContent(Message& m) { + std::string content; + m.getFrames().getContent(content); + Buffer buffer(const_cast<char*>(content.c_str()), content.size()); + T result; + result.decode(buffer); + return result; +} +} + +void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) { + // Thread safe: only calls thread safe Queue functions. + if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet + QueuedMessage message; + if (queue->acquireMessageAt(n, message)) + queue->dequeue(0, message); + } +} + +// Called in connection thread of the queues bridge to primary. +void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*) +{ + sys::Mutex::ScopedLock l(lock); + if (key == DEQUEUE_EVENT_KEY) { + SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); + //TODO: should be able to optimise the following + for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) + dequeue(*i, l); + } else if (key == POSITION_EVENT_KEY) { + SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() + << " to " << position); + assert(queue->getPosition() <= position); + //TODO aconway 2011-12-14: Optimize this? + for (SequenceNumber i = queue->getPosition(); i < position; ++i) + dequeue(i,l); + queue->setPosition(position); + } else { + msg.deliverTo(queue); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + } +} + +// Unused Exchange methods. +bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } +bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } +bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } +std::string QueueReplicator::getType() const { return TYPE_NAME; } + +}} // namespace qpid::broker |