summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/TxReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/TxReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp273
1 files changed, 273 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
new file mode 100644
index 0000000000..33adc9780d
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -0,0 +1,273 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "TxReplicator.h"
+#include "Role.h"
+#include "Backup.h"
+#include "BrokerReplicator.h"
+#include "Event.h"
+#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
+#include "types.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/framing/BufferTypes.h"
+#include "qpid/log/Statement.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using qpid::broker::amqp_0_10::MessageTransfer;
+using qpid::types::Uuid;
+
+namespace {
+const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
+} // namespace
+
+bool TxReplicator::isTxQueue(const string& q) {
+ return startsWith(q, PREFIX);
+}
+
+Uuid TxReplicator::getTxId(const string& q) {
+ if (TxReplicator::isTxQueue(q)) {
+ std::istringstream is(q);
+ is.seekg(PREFIX.size());
+ Uuid id;
+ is >> id;
+ if (!is.fail()) return id;
+ }
+ throw Exception(QPID_MSG("Invalid tx queue: " << q));
+}
+
+string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
+
+boost::shared_ptr<TxReplicator> TxReplicator::create(
+ HaBroker& hb,
+ const boost::shared_ptr<broker::Queue>& txQueue,
+ const boost::shared_ptr<broker::Link>& link)
+{
+ boost::shared_ptr<TxReplicator> tr(new TxReplicator(hb, txQueue, link));
+ tr->initialize();
+ return tr;
+}
+
+TxReplicator::TxReplicator(
+ HaBroker& hb,
+ const boost::shared_ptr<broker::Queue>& txQueue,
+ const boost::shared_ptr<broker::Link>& link) :
+ QueueReplicator(hb, txQueue, link),
+ logPrefix(hb.logPrefix),
+ store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
+ channel(link->nextChannel()),
+ empty(true), ended(false),
+ dequeueState(hb.getBroker().getQueues())
+{
+ logPrefix = "Backup of TX "+shortStr(getTxId(txQueue->getName()))+": ";
+ QPID_LOG(debug, logPrefix << "Started");
+ if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
+
+ // Dispatch transaction events.
+ dispatch[TxEnqueueEvent::KEY] =
+ boost::bind(&TxReplicator::enqueue, this, _1, _2);
+ dispatch[TxDequeueEvent::KEY] =
+ boost::bind(&TxReplicator::dequeue, this, _1, _2);
+ dispatch[TxPrepareEvent::KEY] =
+ boost::bind(&TxReplicator::prepare, this, _1, _2);
+ dispatch[TxCommitEvent::KEY] =
+ boost::bind(&TxReplicator::commit, this, _1, _2);
+ dispatch[TxRollbackEvent::KEY] =
+ boost::bind(&TxReplicator::rollback, this, _1, _2);
+ dispatch[TxBackupsEvent::KEY] =
+ boost::bind(&TxReplicator::backups, this, _1, _2);
+}
+
+TxReplicator::~TxReplicator() {
+ link->returnChannel(channel);
+}
+
+// Send a message to the primary tx.
+void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLock&) {
+ assert(sessionHandler);
+ const MessageTransfer& transfer(MessageTransfer::get(msg));
+ for (FrameSet::const_iterator i = transfer.getFrames().begin();
+ i != transfer.getFrames().end();
+ ++i)
+ {
+ sessionHandler->out.handle(const_cast<AMQFrame&>(*i));
+ }
+}
+
+void TxReplicator::deliver(const broker::Message& m_) {
+ boost::intrusive_ptr<broker::TxBuffer> txbuf;
+ broker::Message m(m_);
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
+ txbuf = txBuffer;
+ m.setReplicationId(enq.id); // Use enqueued replicated id.
+ }
+ // Deliver message to the target queue, not the tx-queue.
+ boost::shared_ptr<broker::Queue> queue = haBroker.getBroker().getQueues().get(enq.queue);
+ QPID_LOG(trace, logPrefix << "Deliver " << logMessageId(*queue, m.getReplicationId()));
+ DeliverableMessage dm(m, txbuf.get());
+ dm.deliverTo(queue);
+}
+
+void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
+ sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
+ TxEnqueueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Enqueue: " << e);
+ enq = e;
+ empty = false;
+}
+
+void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
+ sys::Mutex::ScopedLock l(lock);
+ if (!txBuffer) return;
+ TxDequeueEvent e;
+ decodeStr(data, e);
+ QPID_LOG(trace, logPrefix << "Dequeue: " << e);
+ // NOTE: Backup does not see transactional dequeues until the transaction is
+ // prepared, then they are all receieved before the prepare event.
+ // We collect the events here so we can do a single scan of the queue in prepare.
+ dequeueState.add(e);
+ empty = false;
+}
+
+void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
+ events[event.queue] += event.id;
+}
+
+// Use this function as a seek() predicate to find the dequeued messages.
+bool TxReplicator::DequeueState::addRecord(
+ const broker::Message& m, const boost::shared_ptr<Queue>& queue,
+ const ReplicationIdSet& rids)
+{
+ if (rids.contains(m.getReplicationId())) {
+ DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue,
+ string() /*tag*/,
+ boost::shared_ptr<Consumer>(),
+ true /*acquired*/,
+ false /*accepted*/,
+ false /*credit.isWindowMode()*/,
+ 0 /*credit*/);
+ // Generate record ids, unique within this transaction.
+ dr.setId(nextId++);
+ records.push_back(dr);
+ recordIds += dr.getId();
+ }
+ return false;
+}
+
+void TxReplicator::DequeueState::addRecords(const EventMap::value_type& entry) {
+ // Process all the dequeues for a single queue, in one pass of seek()
+ boost::shared_ptr<broker::Queue> q = queues.get(entry.first);
+ q->seek(cursor, boost::bind(&TxReplicator::DequeueState::addRecord,
+ this, _1, q, entry.second));
+}
+
+boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() {
+ for_each(events.begin(), events.end(),
+ boost::bind(&TxReplicator::DequeueState::addRecords, this, _1));
+ return boost::shared_ptr<TxAccept>(
+ new TxAccept(boost::cref(recordIds), boost::ref(records)));
+}
+
+void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
+ txBuffer->enlist(dequeueState.makeAccept());
+ context = store->begin();
+ if (txBuffer->prepare(context.get())) {
+ QPID_LOG(debug, logPrefix << "Local prepare OK");
+ sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
+ } else {
+ QPID_LOG(error, logPrefix << "Local prepare failed");
+ sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
+ }
+}
+
+void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
+ QPID_LOG(debug, logPrefix << "Commit");
+ if (context.get()) store->commit(*context);
+ txBuffer->commit();
+ end(l);
+}
+
+void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
+ if (!txBuffer) return;
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
+ if (context.get()) store->abort(*context);
+ txBuffer->rollback();
+ end(l);
+}
+
+void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) {
+ TxBackupsEvent e;
+ decodeStr(data, e);
+ if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) {
+ QPID_LOG(info, logPrefix << "Not participating");
+ end(l);
+ } else {
+ QPID_LOG(debug, logPrefix << "Backups: " << e.backups);
+ txBuffer = new broker::TxBuffer;
+ }
+}
+
+void TxReplicator::end(sys::Mutex::ScopedLock&) {
+ ended = true;
+ txBuffer = 0;
+ // QueueReplicator::destroy cancels subscription to the primary tx-queue
+ // which allows the primary to clean up resources.
+ sys::Mutex::ScopedUnlock u(lock);
+ QueueReplicator::destroy();
+}
+
+// Called when the tx queue is deleted.
+void TxReplicator::destroy(sys::Mutex::ScopedLock& l) {
+ if (!ended) {
+ if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback");
+ rollback(string(), l);
+ }
+ QueueReplicator::destroy(l);
+}
+
+}} // namespace qpid::ha