diff options
Diffstat (limited to 'cpp/src/qpid/ha/TxReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/TxReplicator.cpp | 192 |
1 files changed, 192 insertions, 0 deletions
diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp new file mode 100644 index 0000000000..31c68dfe45 --- /dev/null +++ b/cpp/src/qpid/ha/TxReplicator.cpp @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "TxReplicator.h" +#include "Role.h" +#include "Backup.h" +#include "BrokerReplicator.h" +#include "Event.h" +#include "HaBroker.h" +#include "types.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/TxBuffer.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/framing/BufferTypes.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> +#include <boost/bind.hpp> +#include <boost/make_shared.hpp> + +namespace qpid { +namespace ha { + +using namespace std; +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; + +namespace { +const string QPID_HA(QPID_HA_PREFIX); +const string TYPE_NAME(QPID_HA+"tx-queue-replicator"); +const string PREFIX(TRANSACTION_REPLICATOR_PREFIX); +} // namespace + + +bool TxReplicator::isTxQueue(const string& q) { + return startsWith(q, PREFIX); +} + +string TxReplicator::getTxId(const string& q) { + assert(isTxQueue(q)); + return q.substr(PREFIX.size()); +} + +string TxReplicator::getType() const { return TYPE_NAME; } + +TxReplicator::TxReplicator( + HaBroker& hb, + const boost::shared_ptr<broker::Queue>& txQueue, + const boost::shared_ptr<broker::Link>& link) : + QueueReplicator(hb, txQueue, link), + id(getTxId(txQueue->getName())), + txBuffer(new broker::TxBuffer), + broker(hb.getBroker()), + store(broker.hasStore() ? &broker.getStore() : 0), + dequeueState(hb.getBroker().getQueues()) +{ + logPrefix = "Backup of transaction "+id+": "; + + if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded.")); + boost::shared_ptr<Backup> backup = dynamic_pointer_cast<Backup>(hb.getRole()); + if (!backup) throw Exception(QPID_MSG(logPrefix << "Broker is not in backup mode.")); + brokerReplicator = backup->getBrokerReplicator(); + + // Dispatch transaction events. + dispatch[TxEnqueueEvent::KEY] = + boost::bind(&TxReplicator::enqueue, this, _1, _2); + dispatch[TxDequeueEvent::KEY] = + boost::bind(&TxReplicator::dequeue, this, _1, _2); + dispatch[TxPrepareEvent::KEY] = + boost::bind(&TxReplicator::prepare, this, _1, _2); + dispatch[TxCommitEvent::KEY] = + boost::bind(&TxReplicator::commit, this, _1, _2); + dispatch[TxRollbackEvent::KEY] = + boost::bind(&TxReplicator::rollback, this, _1, _2); +} + +void TxReplicator::deliver(const broker::Message& m_) { + // Deliver message to the target queue, not the tx-queue. + broker::Message m(m_); + m.setReplicationId(enq.id); // Use replicated id. + boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(enq.queue); + QPID_LOG(trace, logPrefix << "Deliver " << LogMessageId(*queue, m)); + DeliverableMessage dm(m, txBuffer.get()); + dm.deliverTo(queue); +} + +void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { + TxEnqueueEvent e; + decodeStr(data, e); + QPID_LOG(trace, logPrefix << "Enqueue: " << e); + enq = e; +} + +void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { + TxDequeueEvent e; + decodeStr(data, e); + QPID_LOG(trace, logPrefix << "Dequeue: " << e); + // NOTE: Backup does not see transactional dequeues until the transaction is + // prepared, then they are all receieved before the prepare event. + // We collect the events here so we can do a single scan of the queue in prepare. + dequeueState.add(e); +} + +void TxReplicator::DequeueState::add(const TxDequeueEvent& event) { + events[event.queue] += event.id; +} + +// Use this function as a seek() predicate to find the dequeued messages. +bool TxReplicator::DequeueState::addRecord( + const broker::Message& m, const boost::shared_ptr<Queue>& queue, + const ReplicationIdSet& rids) +{ + if (rids.contains(m.getReplicationId())) { + // FIXME aconway 2013-07-24: + // - Do we need to acquire before creating a DR? + // - Are the parameters to DeliveryRecord ok? + DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue, + string() /*tag*/, + boost::shared_ptr<Consumer>(), + true /*acquired*/, + false /*accepted*/, + false /*credit.isWindowMode()*/, + 0 /*credit*/); + // Fake record ids, unique within this transaction. + dr.setId(nextId++); + records.push_back(dr); + recordIds += dr.getId(); + } + return false; +} + +void TxReplicator::DequeueState::addRecords(const EventMap::value_type& entry) { + // Process all the dequeues for a single queue, in one pass of seek() + boost::shared_ptr<broker::Queue> q = queues.get(entry.first); + q->seek(cursor, boost::bind(&TxReplicator::DequeueState::addRecord, + this, _1, q, entry.second)); +} + +boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() { + for_each(events.begin(), events.end(), + boost::bind(&TxReplicator::DequeueState::addRecords, this, _1)); + return make_shared<TxAccept>(cref(recordIds), ref(records)); +} + +void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Prepare"); + txBuffer->enlist(dequeueState.makeAccept()); + context = store->begin(); + txBuffer->prepare(context.get()); + // FIXME aconway 2013-07-26: notify the primary of prepare outcome. +} + +void TxReplicator::commit(const string&, sys::Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Commit"); + if (context.get()) store->commit(*context); + txBuffer->commit(); + end(); +} + +void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Rollback"); + if (context.get()) store->abort(*context); + txBuffer->rollback(); + end(); +} + +void TxReplicator::end(){ + // FIXME aconway 2013-07-26: destroying the txqueue (auto-delete?) will + // destroy this via QueueReplicator::destroy +} +}} // namespace qpid::ha |