summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp174
1 files changed, 174 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
new file mode 100644
index 0000000000..0017cc82cd
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -0,0 +1,174 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+#include <sstream>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+const std::string TYPE_NAME("qpid.queue-replicator");
+const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
+}
+
+namespace qpid {
+namespace ha {
+using namespace broker;
+using namespace framing;
+
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
+const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
+
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+ return QPID_REPLICATOR_ + queueName;
+}
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
+ : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
+{
+ std::stringstream ss;
+ ss << "HA: Backup " << queue->getName() << ": ";
+ logPrefix = ss.str();
+ QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
+}
+
+// This must be separate from the constructor so we can call shared_from_this.
+void QueueReplicator::activate() {
+ // Note this may create a new bridge or use an existing one.
+ queue->getBroker()->getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ queue->getName(), // src
+ getName(), // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ // Include shared_ptr to self to ensure we are not deleted
+ // before initializeBridge is called.
+ boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
+ );
+}
+
+QueueReplicator::~QueueReplicator() {}
+
+void QueueReplicator::deactivate() {
+ sys::Mutex::ScopedLock l(lock);
+ queue->getBroker()->getLinks().destroy(
+ link->getHost(), link->getPort(), queue->getName(), getName(), string());
+ QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
+}
+
+// Called in a broker connection thread when the bridge is created.
+// shared_ptr to self ensures we are not deleted before initializeBridge is called.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
+ boost::shared_ptr<QueueReplicator> /*self*/) {
+ sys::Mutex::ScopedLock l(lock);
+ bridgeName = bridge.getName();
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+ framing::FieldTable settings;
+
+ // FIXME aconway 2011-12-09: Failover optimization removed.
+ // There was code here to re-use messages already on the backup
+ // during fail-over. This optimization was removed to simplify
+ // the logic till we get the basic replication stable, it
+ // can be re-introduced later. Last revision with the optimization:
+ // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
+ // Clear out any old messages, reset the queue to start replicating fresh.
+ queue->purge();
+ queue->setPosition(0);
+
+ settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ // TODO aconway 2011-12-19: optimize.
+ settings.setInt(QPID_SYNC_FREQUENCY, 1);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings);
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
+}
+
+namespace {
+template <class T> T decodeContent(Message& m) {
+ std::string content;
+ m.getFrames().getContent(content);
+ Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+ T result;
+ result.decode(buffer);
+ return result;
+}
+}
+
+void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) {
+ // Thread safe: only calls thread safe Queue functions.
+ if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
+ QueuedMessage message;
+ if (queue->acquireMessageAt(n, message))
+ queue->dequeue(0, message);
+ }
+}
+
+// Called in connection thread of the queues bridge to primary.
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (key == DEQUEUE_EVENT_KEY) {
+ SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
+ //TODO: should be able to optimise the following
+ for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
+ dequeue(*i, l);
+ } else if (key == POSITION_EVENT_KEY) {
+ SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
+ << " to " << position);
+ assert(queue->getPosition() <= position);
+ //TODO aconway 2011-12-14: Optimize this?
+ for (SequenceNumber i = queue->getPosition(); i < position; ++i)
+ dequeue(i,l);
+ queue->setPosition(position);
+ } else {
+ msg.deliverTo(queue);
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ }
+}
+
+// Unused Exchange methods.
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
+}} // namespace qpid::broker