summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp')
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp428
1 files changed, 428 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
new file mode 100644
index 0000000000..04780e83e8
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
@@ -0,0 +1,428 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <exception>
+#include <malloc.h>
+#include <memory.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/windows/check.h>
+
+#include "TransactionLog.h"
+#include "Transaction.h"
+#include "Lsn.h"
+
+namespace {
+
+// Structures that hold log records. Each has a type field at the start.
+enum TransactionEntryType {
+ TransactionStartDtxEntry = 1,
+ TransactionStartTxEntry = 2,
+ TransactionPrepareEntry = 3,
+ TransactionCommitEntry = 4,
+ TransactionAbortEntry = 5,
+ TransactionDeleteEntry = 6
+};
+// The only thing that really takes up space in transaction records is the
+// xid. Max xid length is in the neighborhood of 600 bytes. Leave some room.
+static const uint32_t MaxTransactionContentLength = 1024;
+
+// Dtx-Start
+struct TransactionStartDtx {
+ TransactionEntryType type;
+ uint32_t length;
+ char content[MaxTransactionContentLength];
+
+ TransactionStartDtx()
+ : type(TransactionStartDtxEntry), length(0) {}
+};
+// Tx-Start
+struct TransactionStartTx {
+ TransactionEntryType type;
+
+ TransactionStartTx()
+ : type(TransactionStartTxEntry) {}
+};
+// Prepare
+struct TransactionPrepare {
+ TransactionEntryType type;
+
+ TransactionPrepare()
+ : type(TransactionPrepareEntry) {}
+};
+// Commit
+struct TransactionCommit {
+ TransactionEntryType type;
+
+ TransactionCommit()
+ : type(TransactionCommitEntry) {}
+};
+// Abort
+struct TransactionAbort {
+ TransactionEntryType type;
+
+ TransactionAbort()
+ : type(TransactionAbortEntry) {}
+};
+// Delete
+struct TransactionDelete {
+ TransactionEntryType type;
+
+ TransactionDelete()
+ : type(TransactionDeleteEntry) {}
+};
+
+} // namespace
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+void
+TransactionLog::initialize()
+{
+ // Write something to occupy the first record, preventing a real
+ // transaction from being lsn/id 0. Delete of a non-existant id is easily
+ // tossed during recovery if no other transactions have caused the tail
+ // to be moved up past this dummy record by then.
+ deleteTransaction(0);
+}
+
+uint32_t
+TransactionLog::marshallingBufferSize()
+{
+ size_t biggestNeed = sizeof(TransactionStartDtx);
+ uint32_t defSize = static_cast<uint32_t>(biggestNeed);
+ uint32_t minSize = Log::marshallingBufferSize();
+ if (defSize <= minSize)
+ return minSize;
+ // Round up to multiple of minSize
+ return (defSize + minSize) / minSize * minSize;
+}
+
+// Get a new Transaction
+boost::shared_ptr<Transaction>
+TransactionLog::begin()
+{
+ TransactionStartTx entry;
+ CLFS_LSN location;
+ uint64_t id;
+ uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+ location = write(&entry, entryLength);
+ try {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ id = lsnToId(location);
+ std::auto_ptr<Transaction> t(new Transaction(id, shared_from_this()));
+ boost::shared_ptr<Transaction> t2(t);
+ boost::weak_ptr<Transaction> weak_t2(t2);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[id] = weak_t2;
+ }
+ return t2;
+ }
+ catch(...) {
+ deleteTransaction(id);
+ throw;
+ }
+}
+
+// Get a new TPCTransaction
+boost::shared_ptr<TPCTransaction>
+TransactionLog::begin(const std::string& xid)
+{
+ TransactionStartDtx entry;
+ CLFS_LSN location;
+ uint64_t id;
+ uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+ entry.length = static_cast<uint32_t>(xid.length());
+ memcpy_s(entry.content, sizeof(entry.content),
+ xid.c_str(), xid.length());
+ entryLength -= (sizeof(entry.content) - entry.length);
+ location = write(&entry, entryLength);
+ try {
+ id = lsnToId(location);
+ std::auto_ptr<TPCTransaction> t(new TPCTransaction(id,
+ shared_from_this(),
+ xid));
+ boost::shared_ptr<TPCTransaction> t2(t);
+ boost::weak_ptr<Transaction> weak_t2(t2);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[id] = weak_t2;
+ }
+ return t2;
+ }
+ catch(...) {
+ deleteTransaction(id);
+ throw;
+ }
+}
+
+void
+TransactionLog::recordPrepare(uint64_t transId)
+{
+ TransactionPrepare entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+}
+
+void
+TransactionLog::recordCommit(uint64_t transId)
+{
+ TransactionCommit entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[transId].reset();
+ }
+}
+
+void
+TransactionLog::recordAbort(uint64_t transId)
+{
+ TransactionAbort entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[transId].reset();
+ }
+}
+
+void
+TransactionLog::deleteTransaction(uint64_t transId)
+{
+ uint64_t newFirstId = 0;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds.erase(transId);
+ // May have deleted the first entry; if so the log can release that.
+ // If this deletion results in an empty list of transactions,
+ // move the tail up to this transaction's LSN. This may result in
+ // one or more transactions being stranded in the log until there's
+ // more activity. If a restart happens while these unneeded log
+ // records are there, the presence of the TransactionDelete
+ // entry will cause them to be ignored anyway.
+ if (validIds.empty())
+ newFirstId = transId;
+ else if (validIds.begin()->first > transId)
+ newFirstId = validIds.begin()->first;
+ }
+ TransactionDelete deleteEntry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&deleteEntry, sizeof(deleteEntry), &transLsn);
+ if (newFirstId != 0)
+ moveTail(idToLsn(newFirstId));
+}
+
+void
+TransactionLog::collectPreparedXids(std::map<std::string, TPCTransaction::shared_ptr>& preparedMap)
+{
+ // Go through all the known transactions; if the transaction is still
+ // valid (open or prepared) it will have weak_ptr to the Transaction.
+ // If it can be downcast and has a state of TRANS_PREPARED, add to the map.
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ std::map<uint64_t, boost::weak_ptr<Transaction> >::const_iterator i;
+ for (i = validIds.begin(); i != validIds.end(); ++i) {
+ Transaction::shared_ptr t = i->second.lock();
+ if (t.get() == 0)
+ continue;
+ TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t));
+ if (tpct.get() == 0)
+ continue;
+ if (tpct->state == Transaction::TRANS_PREPARED)
+ preparedMap[tpct->getXid()] = tpct;
+ }
+}
+
+void
+TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap)
+{
+ QPID_LOG(debug, "Recovering transaction log");
+
+ // Note that there may be transaction refs in the log which are deleted,
+ // so be sure to only add transactions at Start records, and ignore those
+ // that don't have an existing message record.
+ // Get the base LSN - that's how to say "start reading at the beginning"
+ CLFS_INFORMATION info;
+ ULONG infoLength = sizeof (info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Pointers for the various record types that can be assigned in the
+ // reading loop below.
+ TransactionStartDtx *startDtxEntry;
+ TransactionStartTx *startTxEntry;
+
+ PVOID recordPointer;
+ ULONG recordLength;
+ CLFS_RECORD_TYPE recordType = ClfsDataRecord;
+ CLFS_LSN transLsn, current, undoNext;
+ PVOID readContext;
+ uint64_t transId;
+ // Note 'current' in case it's needed below; ReadNextLogRecord returns it
+ // via a parameter.
+ current = info.BaseLsn;
+ ok = ::ReadLogRecord(marshal,
+ &info.BaseLsn,
+ ClfsContextForward,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ &undoNext,
+ &transLsn,
+ &readContext,
+ 0);
+
+ std::auto_ptr<Transaction> tPtr;
+ std::auto_ptr<TPCTransaction> tpcPtr;
+ while (ok) {
+ std::string xid;
+
+ // All the record types this class writes have a TransactionEntryType
+ // in the beginning. Based on that, do what's needed.
+ TransactionEntryType *t =
+ reinterpret_cast<TransactionEntryType *>(recordPointer);
+ switch(*t) {
+ case TransactionStartDtxEntry:
+ startDtxEntry =
+ reinterpret_cast<TransactionStartDtx *>(recordPointer);
+ transId = lsnToId(current);
+ QPID_LOG(debug, "Dtx start, id " << transId);
+ xid.assign(startDtxEntry->content, startDtxEntry->length);
+ tpcPtr.reset(new TPCTransaction(transId, shared_from_this(), xid));
+ transMap[transId] = boost::shared_ptr<TPCTransaction>(tpcPtr);
+ break;
+ case TransactionStartTxEntry:
+ startTxEntry =
+ reinterpret_cast<TransactionStartTx *>(recordPointer);
+ transId = lsnToId(current);
+ QPID_LOG(debug, "Tx start, id " << transId);
+ tPtr.reset(new Transaction(transId, shared_from_this()));
+ transMap[transId] = boost::shared_ptr<Transaction>(tPtr);
+ break;
+ case TransactionPrepareEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Dtx prepare, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Dtx " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_PREPARED;
+ }
+ break;
+ case TransactionCommitEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn commit, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_COMMITTED;
+ }
+ break;
+ case TransactionAbortEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn abort, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_ABORTED;
+ }
+ break;
+ case TransactionDeleteEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn delete, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_DELETED;
+ transMap.erase(transId);
+ }
+ break;
+ default:
+ throw std::runtime_error("Bad transaction log entry type");
+ }
+
+ recordType = ClfsDataRecord;
+ ok = ::ReadNextLogRecord(readContext,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ 0, // No userLsn
+ &undoNext,
+ &transLsn,
+ &current,
+ 0);
+ }
+ DWORD status = ::GetLastError();
+ ::TerminateReadLog(readContext);
+ if (status != ERROR_HANDLE_EOF) // No more records
+ throw QPID_WINDOWS_ERROR(status);
+
+ QPID_LOG(debug, "Transaction log recovered");
+
+ // At this point we have a list of all the not-deleted transactions that
+ // were in existence when the broker last ran. All transactions of both
+ // Dtx and Tx types that haven't prepared or committed will be aborted.
+ // This will give the proper background against which to decide each
+ // message's disposition when recovering messages that were involved in
+ // transactions.
+ // In addition to recovering and aborting transactions, rebuild the
+ // validIds map now that we know which ids are really valid.
+ std::map<uint64_t, Transaction::shared_ptr>::const_iterator i;
+ for (i = transMap.begin(); i != transMap.end(); ++i) {
+ switch(i->second->state) {
+ case Transaction::TRANS_OPEN:
+ QPID_LOG(debug, "Txn " << i->first << " was open; aborted");
+ i->second->state = Transaction::TRANS_ABORTED;
+ break;
+ case Transaction::TRANS_ABORTED:
+ QPID_LOG(debug, "Txn " << i->first << " was aborted");
+ break;
+ case Transaction::TRANS_COMMITTED:
+ QPID_LOG(debug, "Txn " << i->first << " was committed");
+ break;
+ case Transaction::TRANS_PREPARED:
+ QPID_LOG(debug, "Txn " << i->first << " was prepared");
+ break;
+ case Transaction::TRANS_DELETED:
+ QPID_LOG(error,
+ "Txn " << i->first << " was deleted; shouldn't be here");
+ break;
+ }
+ boost::weak_ptr<Transaction> weak_txn(i->second);
+ validIds[i->first] = weak_txn;
+ }
+}
+
+}}} // namespace qpid::store::ms_clfs