summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/TxReplicator.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/TxReplicator.h')
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h136
1 files changed, 136 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
new file mode 100644
index 0000000000..c7599d21b1
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -0,0 +1,136 @@
+#ifndef QPID_HA_TRANSACTIONREPLICATOR_H
+#define QPID_HA_TRANSACTIONREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "LogPrefix.h"
+#include "QueueReplicator.h"
+#include "Event.h"
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/types/Uuid.h"
+
+namespace qpid {
+
+namespace broker {
+class TxBuffer;
+class TxAccept;
+class DtxBuffer;
+class Broker;
+class MessageStore;
+class Deliverable;
+}
+
+namespace ha {
+class BrokerReplicator;
+
+/**
+ * Exchange created on a backup broker to replicate a transaction on the primary.
+ *
+ * Subscribes to a tx-queue like a normal queue but puts replicated messages and
+ * transaction events into a local TxBuffer.
+ *
+ * THREAD SAFE: Called in different connection threads.
+ */
+class TxReplicator : public QueueReplicator {
+ public:
+ typedef boost::shared_ptr<broker::Queue> QueuePtr;
+ typedef boost::shared_ptr<broker::Link> LinkPtr;
+
+ static bool isTxQueue(const std::string& queue);
+ static types::Uuid getTxId(const std::string& queue);
+
+ static boost::shared_ptr<TxReplicator> create(
+ HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
+
+ ~TxReplicator();
+
+ std::string getType() const;
+
+ // QueueReplicator overrides
+ using QueueReplicator::destroy;
+ void destroy(sys::Mutex::ScopedLock&);
+
+ protected:
+
+ void deliver(const broker::Message&);
+
+ private:
+
+ typedef void (TxReplicator::*DispatchFunction)(
+ const std::string&, sys::Mutex::ScopedLock&);
+ typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap;
+ typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap;
+
+ TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
+ void sendMessage(const broker::Message&, sys::Mutex::ScopedLock&);
+ void enqueue(const std::string& data, sys::Mutex::ScopedLock&);
+ void dequeue(const std::string& data, sys::Mutex::ScopedLock&);
+ void prepare(const std::string& data, sys::Mutex::ScopedLock&);
+ void commit(const std::string& data, sys::Mutex::ScopedLock&);
+ void rollback(const std::string& data, sys::Mutex::ScopedLock&);
+ void backups(const std::string& data, sys::Mutex::ScopedLock&);
+ void end(sys::Mutex::ScopedLock&);
+
+ LogPrefix2 logPrefix;
+ TxEnqueueEvent enq; // Enqueue data for next deliver.
+ boost::intrusive_ptr<broker::TxBuffer> txBuffer;
+ broker::MessageStore* store;
+ std::auto_ptr<broker::TransactionContext> context;
+ framing::ChannelId channel; // Channel to send prepare-complete.
+ bool empty, ended;
+
+ // Class to process dequeues and create DeliveryRecords to populate a
+ // TxAccept.
+ class DequeueState {
+ public:
+ DequeueState(broker::QueueRegistry& qr) : queues(qr) {}
+ void add(const TxDequeueEvent&);
+ boost::shared_ptr<broker::TxAccept> makeAccept();
+
+ private:
+ // Delivery record IDs are command IDs from the session.
+ // On a backup we will just fake these Ids.
+ typedef framing::SequenceNumber Id;
+ typedef framing::SequenceSet IdSet;
+ typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> EventMap;
+
+ bool addRecord(const broker::Message& m,
+ const boost::shared_ptr<broker::Queue>&,
+ const ReplicationIdSet& );
+ void addRecords(const DequeueMap::value_type& entry);
+
+ broker::QueueRegistry& queues;
+ EventMap events;
+ broker::DeliveryRecords records;
+ broker::QueueCursor cursor;
+ framing::SequenceNumber nextId;
+ IdSet recordIds;
+ };
+ DequeueState dequeueState;
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_TRANSACTIONREPLICATOR_H*/