summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/TxBuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/TxBuffer.h')
-rw-r--r--qpid/cpp/src/qpid/broker/TxBuffer.h148
1 files changed, 148 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.h b/qpid/cpp/src/qpid/broker/TxBuffer.h
new file mode 100644
index 0000000000..2478b78138
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/TxBuffer.h
@@ -0,0 +1,148 @@
+#ifndef QPID_BROKER_TXBUFFER_H
+#define QPID_BROKER_TXBUFFER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxOp.h"
+#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/sys/Mutex.h"
+#include <algorithm>
+#include <functional>
+#include <vector>
+
+
+namespace qpid {
+namespace broker {
+class TransactionObserver;
+
+/**
+ * Represents a single transaction. As such, an instance of this class
+ * will hold a list of operations representing the workload of the
+ * transaction. This work can be committed or rolled back. Committing
+ * is a two-stage process: first all the operations should be
+ * prepared, then if that succeeds they can be committed.
+ *
+ * In the 2pc case, a successful prepare may be followed by either a
+ * commit or a rollback.
+ *
+ * Atomicity of prepare is ensured by using a lower level
+ * transactional facility. This saves explicitly rolling back all the
+ * successfully prepared ops when one of them fails. i.e. we do not
+ * use 2pc internally, we instead ensure that prepare is atomic at a
+ * lower level. This makes individual prepare operations easier to
+ * code.
+ *
+ * Transactions on a messaging broker effect three types of 'action':
+ * (1) updates to persistent storage (2) updates to transient storage
+ * or cached data (3) network writes.
+ *
+ * Of these, (1) should always occur atomically during prepare to
+ * ensure that if the broker crashes while a transaction is being
+ * completed the persistent state (which is all that then remains) is
+ * consistent. (3) can only be done on commit, after a successful
+ * prepare. There is a little more flexibility with (2) but any
+ * changes made during prepare should be subject to the control of the
+ * TransactionalStore in use.
+ *
+ * TxBuffer inherits AsyncCompletion because transactions can be completed
+ * asynchronously if the broker is part of a HA cluster.
+ */
+class TxBuffer : public AsyncCompletion {
+ private:
+ typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
+ std::vector<TxOp::shared_ptr> ops;
+ boost::shared_ptr<TransactionObserver> observer;
+ std::auto_ptr<TransactionContext> txContext;
+ std::string error;
+ sys::Mutex errorLock;
+
+ public:
+ QPID_BROKER_EXTERN TxBuffer();
+
+ /**
+ * Adds an operation to the transaction.
+ */
+ QPID_BROKER_EXTERN void enlist(TxOp::shared_ptr op);
+
+ /**
+ * Requests that all ops are prepared. This should
+ * primarily involve making sure that a persistent record
+ * of the operations is stored where necessary.
+ *
+ * Once prepared, a transaction can be committed (or in
+ * the 2pc case, rolled back).
+ *
+ * @returns true if all the operations prepared
+ * successfully, false if not.
+ */
+ QPID_BROKER_EXTERN bool prepare(TransactionContext* const ctxt);
+
+ /**
+ * Signals that the ops all prepared successfully and can
+ * now commit, i.e. the operation can now be fully carried
+ * out.
+ *
+ * Should only be called after a call to prepare() returns
+ * true.
+ */
+ QPID_BROKER_EXTERN void commit();
+
+ /**
+ * Signals that all ops can be rolled back.
+ *
+ * Should only be called either after a call to prepare()
+ * returns true (2pc) or instead of a prepare call
+ * ('server-local')
+ */
+ QPID_BROKER_EXTERN void rollback();
+
+ /**
+ * Start a local commit - may complete asynchronously.
+ */
+ QPID_BROKER_EXTERN void startCommit(TransactionalStore* const store);
+
+ /** End a commit, called via async completion.
+ *@return encoded result, not used here.
+ */
+ QPID_BROKER_EXTERN std::string endCommit(TransactionalStore* const store);
+
+ QPID_BROKER_EXTERN void setObserver(boost::shared_ptr<TransactionObserver> o) {
+ observer = o;
+ }
+
+ QPID_BROKER_EXTERN boost::shared_ptr<TransactionObserver> getObserver() const {
+ return observer;
+ }
+
+ /** Set an error to be raised from endCommit when the commit completes.
+ * Called from completer threads if we are doing async completion.
+ * This is the only TxBuffer function called outside the IO thread.
+ */
+ QPID_BROKER_EXTERN void setError(const std::string& message);
+};
+
+}} // namespace qpid::broker
+
+
+#endif /*!QPID_BROKER_TXBUFFER_H*/