summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/TxReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/TxReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/TxReplicator.cpp192
1 files changed, 192 insertions, 0 deletions
diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp
new file mode 100644
index 0000000000..31c68dfe45
--- /dev/null
+++ b/cpp/src/qpid/ha/TxReplicator.cpp
@@ -0,0 +1,192 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "TxReplicator.h"
+#include "Role.h"
+#include "Backup.h"
+#include "BrokerReplicator.h"
+#include "Event.h"
+#include "HaBroker.h"
+#include "types.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/framing/BufferTypes.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace {
+const string QPID_HA(QPID_HA_PREFIX);
+const string TYPE_NAME(QPID_HA+"tx-queue-replicator");
+const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
+} // namespace
+
+
+bool TxReplicator::isTxQueue(const string& q) {
+ return startsWith(q, PREFIX);
+}
+
+string TxReplicator::getTxId(const string& q) {
+ assert(isTxQueue(q));
+ return q.substr(PREFIX.size());
+}
+
+string TxReplicator::getType() const { return TYPE_NAME; }
+
+TxReplicator::TxReplicator(
+ HaBroker& hb,
+ const boost::shared_ptr<broker::Queue>& txQueue,
+ const boost::shared_ptr<broker::Link>& link) :
+ QueueReplicator(hb, txQueue, link),
+ id(getTxId(txQueue->getName())),
+ txBuffer(new broker::TxBuffer),
+ broker(hb.getBroker()),
+ store(broker.hasStore() ? &broker.getStore() : 0),
+ dequeueState(hb.getBroker().getQueues())
+{
+ logPrefix = "Backup of transaction "+id+": ";
+
+ if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
+ boost::shared_ptr<Backup> backup = dynamic_pointer_cast<Backup>(hb.getRole());
+ if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode."));
+ brokerReplicator = backup->getBrokerReplicator();
+
+ // Dispatch transaction events.
+ dispatch[TxEnqueueEvent::KEY] =
+ boost::bind(&TxReplicator::enqueue, this, _1, _2);
+ dispatch[TxDequeueEvent::KEY] =
+ boost::bind(&TxReplicator::dequeue, this, _1, _2);
+ dispatch[TxPrepareEvent::KEY] =
+ boost::bind(&TxReplicator::prepare, this, _1, _2);
+ dispatch[TxCommitEvent::KEY] =
+ boost::bind(&TxReplicator::commit, this, _1, _2);
+ dispatch[TxRollbackEvent::KEY] =
+ boost::bind(&TxReplicator::rollback, this, _1, _2);
+}
+
+void TxReplicator::deliver(const broker::Message& m_) {
+ // Deliver message to the target queue, not the tx-queue.
+ broker::Message m(m_);
+ m.setReplicationId(enq.id); // Use replicated id.
+ boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(enq.queue);
+ QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m));
+ DeliverableMessage dm(m, txBuffer.get());
+ dm.deliverTo(queue);
+}
+
+void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
+ TxEnqueueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Enqueue: " << e);
+ enq = e;
+}
+
+void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
+ TxDequeueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Dequeue: " << e);
+ // NOTE: Backup does not see transactional dequeues until the transaction is
+ // prepared, then they are all receieved before the prepare event.
+ // We collect the events here so we can do a single scan of the queue in prepare.
+ dequeueState.add(e);
+}
+
+void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
+ events[event.queue] += event.id;
+}
+
+// Use this function as a seek() predicate to find the dequeued messages.
+bool TxReplicator::DequeueState::addRecord(
+ const broker::Message& m, const boost::shared_ptr<Queue>& queue,
+ const ReplicationIdSet& rids)
+{
+ if (rids.contains(m.getReplicationId())) {
+ // FIXME aconway 2013-07-24:
+ // - Do we need to acquire before creating a DR?
+ // - Are the parameters to DeliveryRecord ok?
+ DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue,
+ string() /*tag*/,
+ boost::shared_ptr<Consumer>(),
+ true /*acquired*/,
+ false /*accepted*/,
+ false /*credit.isWindowMode()*/,
+ 0 /*credit*/);
+ // Fake record ids, unique within this transaction.
+ dr.setId(nextId++);
+ records.push_back(dr);
+ recordIds += dr.getId();
+ }
+ return false;
+}
+
+void TxReplicator::DequeueState::addRecords(const EventMap::value_type& entry) {
+ // Process all the dequeues for a single queue, in one pass of seek()
+ boost::shared_ptr<broker::Queue> q = queues.get(entry.first);
+ q->seek(cursor, boost::bind(&TxReplicator::DequeueState::addRecord,
+ this, _1, q, entry.second));
+}
+
+boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() {
+ for_each(events.begin(), events.end(),
+ boost::bind(&TxReplicator::DequeueState::addRecords, this, _1));
+ return make_shared<TxAccept>(cref(recordIds), ref(records));
+}
+
+void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Prepare");
+ txBuffer->enlist(dequeueState.makeAccept());
+ context = store->begin();
+ txBuffer->prepare(context.get());
+ // FIXME aconway 2013-07-26: notify the primary of prepare outcome.
+}
+
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Commit");
+ if (context.get()) store->commit(*context);
+ txBuffer->commit();
+ end();
+}
+
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Rollback");
+ if (context.get()) store->abort(*context);
+ txBuffer->rollback();
+ end();
+}
+
+void TxReplicator::end(){
+ // FIXME aconway 2013-07-26: destroying the txqueue (auto-delete?) will
+ // destroy this via QueueReplicator::destroy
+}
+}} // namespace qpid::ha