summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/store/ms-clfs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/store/ms-clfs')
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Log.cpp182
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Log.h78
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Lsn.h36
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp1121
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp406
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h107
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp472
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Messages.h144
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp83
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Transaction.h146
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp428
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h104
12 files changed, 3307 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
new file mode 100644
index 0000000000..e6cb10c133
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <clfsmgmtw32.h>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <stdlib.h>
+#include <qpid/sys/windows/check.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+Log::~Log()
+{
+ if (marshal != 0)
+ ::DeleteLogMarshallingArea(marshal);
+ ::CloseHandle(handle);
+}
+
+void
+Log::open(const std::string& path, const TuningParameters& params)
+{
+ this->containerSize = static_cast<ULONGLONG>(params.containerSize);
+ logPath = path;
+ std::string logSpec = "log:" + path;
+ size_t specLength = logSpec.length();
+ std::auto_ptr<wchar_t> wLogSpec(new wchar_t[specLength + 1]);
+ size_t converted;
+ mbstowcs_s(&converted,
+ wLogSpec.get(), specLength+1,
+ logSpec.c_str(), specLength);
+ handle = ::CreateLogFile(wLogSpec.get(),
+ GENERIC_WRITE | GENERIC_READ,
+ 0,
+ 0,
+ OPEN_ALWAYS,
+ 0);
+ QPID_WINDOWS_CHECK_NOT(handle, INVALID_HANDLE_VALUE);
+ CLFS_INFORMATION info;
+ ULONG infoSize = sizeof(info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ ok = ::RegisterManageableLogClient(handle, 0);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Set up policies for how many containers to initially create and how
+ // large each container should be. Also, auto-grow the log when container
+ // space runs out.
+ CLFS_MGMT_POLICY logPolicy;
+ logPolicy.Version = CLFS_MGMT_POLICY_VERSION;
+ logPolicy.LengthInBytes = sizeof(logPolicy);
+ logPolicy.PolicyFlags = 0;
+
+ // If this is the first time this log is opened, give an opportunity to
+ // initialize its content.
+ bool needInitialize(false);
+ if (info.TotalContainers == 0) {
+ // New log; set the configured container size and create the
+ // initial set of containers.
+ logPolicy.PolicyType = ClfsMgmtPolicyNewContainerSize;
+ logPolicy.PolicyParameters.NewContainerSize.SizeInBytes = containerSize;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ ULONGLONG desired(params.containers), actual(0);
+ ok = ::SetLogFileSizeWithPolicy(handle, &desired, &actual);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ needInitialize = true;
+ }
+ // Ensure that the log is extended as needed and will shrink when 50%
+ // becomes unused.
+ logPolicy.PolicyType = ClfsMgmtPolicyAutoGrow;
+ logPolicy.PolicyParameters.AutoGrow.Enabled = 1;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ logPolicy.PolicyType = ClfsMgmtPolicyAutoShrink;
+ logPolicy.PolicyParameters.AutoShrink.Percentage = params.shrinkPct;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Need a marshaling area
+ ok = ::CreateLogMarshallingArea(handle,
+ NULL, NULL, NULL, // Alloc, free, context
+ marshallingBufferSize(),
+ params.maxWriteBuffers,
+ 1, // Max read buffers
+ &marshal);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ if (needInitialize)
+ initialize();
+}
+
+uint32_t
+Log::marshallingBufferSize()
+{
+ // Default implementation returns the minimum marshalling buffer size;
+ // derived ones should come up with a more fitting value.
+ //
+ // Find the directory name part of the log specification, including the
+ // trailing '\'.
+ size_t dirMarker = logPath.rfind('\\');
+ if (dirMarker == std::string::npos)
+ dirMarker = logPath.rfind('/');
+ DWORD bytesPerSector;
+ DWORD dontCare;
+ ::GetDiskFreeSpace(logPath.substr(0, dirMarker).c_str(),
+ &dontCare,
+ &bytesPerSector,
+ &dontCare,
+ &dontCare);
+ return bytesPerSector;
+}
+
+CLFS_LSN
+Log::write(void* entry, uint32_t length, CLFS_LSN* prev)
+{
+ CLFS_WRITE_ENTRY desc;
+ desc.Buffer = entry;
+ desc.ByteLength = length;
+ CLFS_LSN lsn;
+ BOOL ok = ::ReserveAndAppendLog(marshal,
+ &desc, 1, // Buffer descriptor
+ 0, prev, // Undo-Next, Prev
+ 0, 0, // Reservation
+ CLFS_FLAG_FORCE_FLUSH,
+ &lsn,
+ 0);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ return lsn;
+}
+
+// Get the current base LSN of the log.
+CLFS_LSN
+Log::getBase()
+{
+ CLFS_INFORMATION info;
+ ULONG infoSize = sizeof(info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ return info.BaseLsn;
+}
+
+void
+Log::moveTail(const CLFS_LSN& oldest)
+{
+ BOOL ok = ::AdvanceLogBase(marshal,
+ const_cast<PCLFS_LSN>(&oldest),
+ 0, NULL);
+ // If multiple threads are manipulating things they may get out of
+ // order when moving the tail; if someone already moved it further
+ // than this, it's ok - ignore it.
+ if (ok || ::GetLastError() == ERROR_LOG_START_OF_LOG)
+ return;
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.h b/qpid/cpp/src/qpid/store/ms-clfs/Log.h
new file mode 100644
index 0000000000..2f7eb6cada
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.h
@@ -0,0 +1,78 @@
+#ifndef QPID_STORE_MSCLFS_LOG_H
+#define QPID_STORE_MSCLFS_LOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <windows.h>
+#include <clfsw32.h>
+#include <qpid/sys/IntegerTypes.h>
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class Log
+ *
+ * Represents a CLFS-housed log.
+ */
+class Log {
+
+protected:
+ HANDLE handle;
+ ULONGLONG containerSize;
+ std::string logPath;
+ PVOID marshal;
+
+ // Give subclasses a chance to initialize a new log. Called after a new
+ // log is created, initial set of containers is added, and marshalling
+ // area is allocated.
+ virtual void initialize() {}
+
+public:
+ struct TuningParameters {
+ size_t containerSize;
+ unsigned short containers;
+ unsigned short shrinkPct;
+ uint32_t maxWriteBuffers;
+ };
+
+ Log() : handle(INVALID_HANDLE_VALUE), containerSize(0), marshal(0) {}
+ virtual ~Log();
+
+ void open(const std::string& path, const TuningParameters& params);
+
+ virtual uint32_t marshallingBufferSize();
+
+ CLFS_LSN write(void* entry, uint32_t length, CLFS_LSN* prev = 0);
+
+ // Get the current base LSN of the log.
+ CLFS_LSN getBase();
+
+ // Move the log tail to the indicated LSN.
+ void moveTail(const CLFS_LSN& oldest);
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_LOG_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h b/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h
new file mode 100644
index 0000000000..7f46c1f266
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Lsn.h
@@ -0,0 +1,36 @@
+#ifndef QPID_STORE_MSCLFS_LSN_H
+#define QPID_STORE_MSCLFS_LSN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <clfsw32.h>
+
+namespace {
+ // Make it easy to assign LSNs
+ inline CLFS_LSN idToLsn(const uint64_t val)
+ { CLFS_LSN lsn; lsn.Internal = val; return lsn; }
+
+ inline uint64_t lsnToId(const CLFS_LSN& lsn)
+ { uint64_t val = lsn.Internal; return val; }
+}
+
+#endif /* QPID_STORE_MSCLFS_LSN_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
new file mode 100644
index 0000000000..586aaaf980
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
@@ -0,0 +1,1121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <list>
+#include <map>
+#include <set>
+#include <stdlib.h>
+#include <string>
+#include <windows.h>
+#include <clfsw32.h>
+#include <qpid/broker/RecoverableQueue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/store/MessageStorePlugin.h>
+#include <qpid/store/StoreException.h>
+#include <qpid/store/StorageProvider.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/foreach.hpp>
+#include <boost/make_shared.hpp>
+
+// From ms-sql...
+#include "BlobAdapter.h"
+#include "BlobRecordset.h"
+#include "BindingRecordset.h"
+#include "DatabaseConnection.h"
+#include "Exception.h"
+#include "State.h"
+#include "VariantHelper.h"
+using qpid::store::ms_sql::BlobAdapter;
+using qpid::store::ms_sql::BlobRecordset;
+using qpid::store::ms_sql::BindingRecordset;
+using qpid::store::ms_sql::DatabaseConnection;
+using qpid::store::ms_sql::ADOException;
+using qpid::store::ms_sql::State;
+using qpid::store::ms_sql::VariantHelper;
+
+#include "Log.h"
+#include "Messages.h"
+#include "Transaction.h"
+#include "TransactionLog.h"
+
+// Bring in ADO 2.8 (yes, I know it says "15", but that's it...)
+#import "C:\Program Files\Common Files\System\ado\msado15.dll" \
+ no_namespace rename("EOF", "EndOfFile")
+#include <comdef.h>
+namespace {
+inline void TESTHR(HRESULT x) {if FAILED(x) _com_issue_error(x);};
+
+// Table names
+const std::string TblBinding("tblBinding");
+const std::string TblConfig("tblConfig");
+const std::string TblExchange("tblExchange");
+const std::string TblQueue("tblQueue");
+
+}
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class MSSqlClfsProvider
+ *
+ * Implements a qpid::store::StorageProvider that uses a hybrid Microsoft
+ * SQL Server and Windows CLFS approach as the backend data store for Qpid.
+ */
+class MSSqlClfsProvider : public qpid::store::StorageProvider
+{
+protected:
+ void finalizeMe();
+
+ void dump();
+
+public:
+ MSSqlClfsProvider();
+ ~MSSqlClfsProvider();
+
+ virtual qpid::Options* getOptions() { return &options; }
+
+ virtual void earlyInitialize (Plugin::Target& target);
+ virtual void initialize(Plugin::Target& target);
+
+ /**
+ * Receive notification that this provider is the one that will actively
+ * handle provider storage for the target. If the provider is to be used,
+ * this method will be called after earlyInitialize() and before any
+ * recovery operations (recovery, in turn, precedes call to initialize()).
+ */
+ virtual void activate(MessageStorePlugin &store);
+
+ /**
+ * @name Methods inherited from qpid::broker::MessageStore
+ */
+ //@{
+ /**
+ * If called after init() but before recovery, will discard the database
+ * and reinitialize using an empty store dir. If @a pushDownStoreFiles
+ * is true, the content of the store dir will be moved to a backup dir
+ * inside the store dir. This is used when cluster nodes recover and must
+ * get their content from a cluster sync rather than directly from the
+ * store.
+ *
+ * @param pushDownStoreFiles If true, will move content of the store dir
+ * into a subdir, leaving the store dir
+ * otherwise empty.
+ */
+ virtual void truncateInit(const bool pushDownStoreFiles = false);
+
+ /**
+ * Record the existence of a durable queue
+ */
+ virtual void create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(PersistableQueue& queue);
+
+ /**
+ * Record the existence of a durable exchange
+ */
+ virtual void create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args);
+ /**
+ * Destroy a durable exchange
+ */
+ virtual void destroy(const PersistableExchange& exchange);
+
+ /**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
+ /**
+ * Record generic durable configuration
+ */
+ virtual void create(const PersistableConfig& config);
+
+ /**
+ * Destroy generic durable configuration
+ */
+ virtual void destroy(const PersistableConfig& config);
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(PersistableMessage& msg);
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data);
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of the queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue);
+
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: this is a no-op for this provider.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const PersistableQueue& queue) {};
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue)
+ {return 0;}
+ //@}
+
+ /**
+ * @name Methods inherited from qpid::broker::TransactionalStore
+ */
+ //@{
+ virtual std::auto_ptr<qpid::broker::TransactionContext> begin();
+ virtual std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ virtual void prepare(qpid::broker::TPCTransactionContext& txn);
+ virtual void commit(qpid::broker::TransactionContext& txn);
+ virtual void abort(qpid::broker::TransactionContext& txn);
+ virtual void collectPreparedXids(std::set<std::string>& xids);
+ //@}
+
+ virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer);
+ virtual void recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap);
+ virtual void recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap);
+ virtual void recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap);
+ virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap);
+ virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap);
+
+private:
+ struct ProviderOptions : public qpid::Options
+ {
+ std::string connectString;
+ std::string catalogName;
+ std::string storeDir;
+ size_t containerSize;
+ unsigned short initialContainers;
+ uint32_t maxWriteBuffers;
+
+ ProviderOptions(const std::string &name)
+ : qpid::Options(name),
+ catalogName("QpidStore"),
+ containerSize(1024 * 1024),
+ initialContainers(2),
+ maxWriteBuffers(10)
+ {
+ const enum { NAMELEN = MAX_COMPUTERNAME_LENGTH + 1 };
+ TCHAR myName[NAMELEN];
+ DWORD myNameLen = NAMELEN;
+ GetComputerName(myName, &myNameLen);
+ connectString = "Data Source=";
+ connectString += myName;
+ connectString += "\\SQLEXPRESS;Integrated Security=SSPI";
+ addOptions()
+ ("connect",
+ qpid::optValue(connectString, "STRING"),
+ "Connection string for the database to use. Will prepend "
+ "Provider=SQLOLEDB;")
+ ("catalog",
+ qpid::optValue(catalogName, "DB NAME"),
+ "Catalog (database) name")
+ ("store-dir",
+ qpid::optValue(storeDir, "DIR"),
+ "Location to store message and transaction data "
+ "(default uses data-dir if available)")
+ ("container-size",
+ qpid::optValue(containerSize, "VALUE"),
+ "Bytes per container; min 512K. Only used when creating "
+ "a new log")
+ ("initial-containers",
+ qpid::optValue(initialContainers, "VALUE"),
+ "Number of containers to add if creating a new log")
+ ("max-write-buffers",
+ qpid::optValue(maxWriteBuffers, "VALUE"),
+ "Maximum write buffers outstanding before log is flushed "
+ "(0 means no limit)")
+ ;
+ }
+ };
+ ProviderOptions options;
+ std::string brokerDataDir;
+ Messages messages;
+ // TransactionLog requires itself to have a shared_ptr reference to start.
+ TransactionLog::shared_ptr transactions;
+
+ // Each thread has a separate connection to the database and also needs
+ // to manage its COM initialize/finalize individually. This is done by
+ // keeping a thread-specific State.
+ boost::thread_specific_ptr<State> dbState;
+
+ State *initState();
+ DatabaseConnection *initConnection(void);
+ void createDb(DatabaseConnection *db, const std::string &name);
+ void createLogs();
+};
+
+static MSSqlClfsProvider static_instance_registers_plugin;
+
+void
+MSSqlClfsProvider::finalizeMe()
+{
+ dbState.reset();
+}
+
+MSSqlClfsProvider::MSSqlClfsProvider()
+ : options("MS SQL/CLFS Provider options")
+{
+ transactions = boost::make_shared<TransactionLog>();
+}
+
+MSSqlClfsProvider::~MSSqlClfsProvider()
+{
+}
+
+void
+MSSqlClfsProvider::earlyInitialize(Plugin::Target &target)
+{
+ MessageStorePlugin *store = dynamic_cast<MessageStorePlugin *>(&target);
+ if (store) {
+ // Check the store dir option; if not specified, need to
+ // grab the broker's data dir.
+ if (options.storeDir.empty()) {
+ DataDir& dir = store->getBroker()->getDataDir();
+ if (dir.isEnabled()) {
+ options.storeDir = dir.getPath();
+ }
+ else {
+ QPID_LOG(error,
+ "MSSQL-CLFS: --store-dir required if --no-data-dir specified");
+ return;
+ }
+ }
+
+ // If CLFS is not available on this system, give up now.
+ try {
+ Log::TuningParameters params;
+ params.containerSize = options.containerSize;
+ params.containers = options.initialContainers;
+ params.shrinkPct = 50;
+ params.maxWriteBuffers = options.maxWriteBuffers;
+ std::string msgPath = options.storeDir + "\\" + "messages";
+ messages.openLog(msgPath, params);
+ std::string transPath = options.storeDir + "\\" + "transactions";
+ transactions->open(transPath, params);
+ }
+ catch (std::exception &e) {
+ QPID_LOG(error, e.what());
+ return;
+ }
+
+ // If the database init fails, report it and don't register; give
+ // the rest of the broker a chance to run.
+ //
+ // Don't try to initConnection() since that will fail if the
+ // database doesn't exist. Instead, try to open a connection without
+ // a database name, then search for the database. There's still a
+ // chance this provider won't be selected for the store too, so be
+ // be sure to close the database connection before return to avoid
+ // leaving a connection up that will not be used.
+ try {
+ initState(); // This initializes COM
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection());
+ db->open(options.connectString, "");
+ _ConnectionPtr conn(*db);
+ _RecordsetPtr pCatalogs = NULL;
+ VariantHelper<std::string> catalogName(options.catalogName);
+ pCatalogs = conn->OpenSchema(adSchemaCatalogs, catalogName);
+ if (pCatalogs->EndOfFile) {
+ // Database doesn't exist; create it
+ QPID_LOG(notice,
+ "MSSQL-CLFS: Creating database " + options.catalogName);
+ createDb(db.get(), options.catalogName);
+ }
+ else {
+ QPID_LOG(notice,
+ "MSSQL-CLFS: Database located: " + options.catalogName);
+ }
+ if (pCatalogs) {
+ if (pCatalogs->State == adStateOpen)
+ pCatalogs->Close();
+ pCatalogs = 0;
+ }
+ db->close();
+ store->providerAvailable("MSSQL-CLFS", this);
+ }
+ catch (qpid::Exception &e) {
+ QPID_LOG(error, e.what());
+ return;
+ }
+ store->addFinalizer(boost::bind(&MSSqlClfsProvider::finalizeMe, this));
+ }
+}
+
+void
+MSSqlClfsProvider::initialize(Plugin::Target& target)
+{
+}
+
+void
+MSSqlClfsProvider::activate(MessageStorePlugin &store)
+{
+ QPID_LOG(info, "MS SQL/CLFS Provider is up");
+}
+
+void
+MSSqlClfsProvider::truncateInit(const bool pushDownStoreFiles)
+{
+}
+
+void
+MSSqlClfsProvider::create(PersistableQueue& queue,
+ const qpid::framing::FieldTable& /*args needed for jrnl*/)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ try {
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsQueues.add(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating queue " + queue.getName(), e, errs);
+ }
+ catch(std::exception& e) {
+ db->rollbackTransaction();
+ THROW_STORE_EXCEPTION(e.what());
+ }
+}
+
+/**
+ * Destroy a durable queue
+ */
+void
+MSSqlClfsProvider::destroy(PersistableQueue& queue)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsQueues.open(db, TblQueue);
+ rsBindings.open(db, TblBinding);
+ // Remove bindings first; the queue IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForQueue(queue.getPersistenceId());
+ rsQueues.remove(queue);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting queue " + queue.getName(), e, errs);
+ }
+
+ /*
+ * Now that the SQL stuff has recorded the queue deletion, expunge
+ * all record of the queue from the messages set. Any errors logging
+ * these removals are swallowed because during a recovery the queue
+ * Id won't be present (the SQL stuff already committed) so any references
+ * to it in message operations will be removed.
+ */
+ messages.expunge(queue.getPersistenceId());
+}
+
+/**
+ * Record the existence of a durable exchange
+ */
+void
+MSSqlClfsProvider::create(const PersistableExchange& exchange,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ try {
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsExchanges.add(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating exchange " + exchange.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Destroy a durable exchange
+ */
+void
+MSSqlClfsProvider::destroy(const PersistableExchange& exchange)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsExchanges.open(db, TblExchange);
+ rsBindings.open(db, TblBinding);
+ // Remove bindings first; the exchange IDs can't be ripped out from
+ // under the references in the bindings table.
+ rsBindings.removeForExchange(exchange.getPersistenceId());
+ rsExchanges.remove(exchange);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting exchange " + exchange.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Record a binding
+ */
+void
+MSSqlClfsProvider::bind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.add(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error binding exchange " + exchange.getName() +
+ " to queue " + queue.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Forget a binding
+ */
+void
+MSSqlClfsProvider::unbind(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ try {
+ db->beginTransaction();
+ rsBindings.open(db, TblBinding);
+ rsBindings.remove(exchange.getPersistenceId(),
+ queue.getPersistenceId(),
+ key,
+ args);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error unbinding exchange " + exchange.getName() +
+ " from queue " + queue.getName(),
+ e,
+ errs);
+ }
+}
+
+/**
+ * Record generic durable configuration
+ */
+void
+MSSqlClfsProvider::create(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ try {
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.add(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error creating config " + config.getName(), e, errs);
+ }
+}
+
+/**
+ * Destroy generic durable configuration
+ */
+void
+MSSqlClfsProvider::destroy(const PersistableConfig& config)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ try {
+ db->beginTransaction();
+ rsConfigs.open(db, TblConfig);
+ rsConfigs.remove(config);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting config " + config.getName(), e, errs);
+ }
+}
+
+/**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+void
+MSSqlClfsProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg)
+{
+#if 0
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.add(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error staging message", e, errs);
+ }
+#endif
+}
+
+/**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+void
+MSSqlClfsProvider::destroy(PersistableMessage& msg)
+{
+#if 0
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.remove(msg);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error deleting message", e, errs);
+ }
+#endif
+}
+
+/**
+ * Appends content to a previously staged message
+ */
+void
+MSSqlClfsProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data)
+{
+#if 0
+ DatabaseConnection *db = initConnection();
+ MessageRecordset rsMessages;
+ try {
+ db->beginTransaction();
+ rsMessages.open(db, TblMessage);
+ rsMessages.append(msg, data);
+ db->commitTransaction();
+ }
+ catch(_com_error &e) {
+ std::string errs = db->getErrors();
+ db->rollbackTransaction();
+ throw ADOException("Error appending to message", e, errs);
+ }
+#endif
+}
+
+/**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+void
+MSSqlClfsProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
+ const boost::intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ // Message log keeps all messages in one log, so we don't need the
+ // queue reference.
+ messages.loadContent(msg->getPersistenceId(), data, offset, length);
+}
+
+/**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param ctxt The transaction context under which this enqueue happens.
+ * @param msg The message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ */
+void
+MSSqlClfsProvider::enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ Transaction::shared_ptr t;
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt);
+ if (ctx)
+ t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(ctxt);
+ if (tctx)
+ t = tctx->getTransaction();
+ }
+ uint64_t msgId = msg->getPersistenceId();
+ if (msgId == 0) {
+ messages.add(msg);
+ msgId = msg->getPersistenceId();
+ }
+ messages.enqueue(msgId, queue.getPersistenceId(), t);
+ msg->enqueueComplete();
+}
+
+/**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param ctxt The transaction context under which this dequeue happens.
+ * @param msg The message to dequeue
+ * @param queue The queue from which it is to be dequeued
+ */
+void
+MSSqlClfsProvider::dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+{
+ Transaction::shared_ptr t;
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(ctxt);
+ if (ctx)
+ t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(ctxt);
+ if (tctx)
+ t = tctx->getTransaction();
+ }
+ messages.dequeue(msg->getPersistenceId(), queue.getPersistenceId(), t);
+ msg->dequeueComplete();
+}
+
+std::auto_ptr<qpid::broker::TransactionContext>
+MSSqlClfsProvider::begin()
+{
+ Transaction::shared_ptr t = transactions->begin();
+ std::auto_ptr<qpid::broker::TransactionContext> tc(new TransactionContext(t));
+ return tc;
+}
+
+std::auto_ptr<qpid::broker::TPCTransactionContext>
+MSSqlClfsProvider::begin(const std::string& xid)
+{
+ TPCTransaction::shared_ptr t = transactions->begin(xid);
+ std::auto_ptr<qpid::broker::TPCTransactionContext> tc(new TPCTransactionContext(t));
+ return tc;
+}
+
+void
+MSSqlClfsProvider::prepare(qpid::broker::TPCTransactionContext& txn)
+{
+ TPCTransactionContext *ctx = dynamic_cast<TPCTransactionContext*> (&txn);
+ if (ctx == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ ctx->getTransaction()->prepare();
+}
+
+void
+MSSqlClfsProvider::commit(qpid::broker::TransactionContext& txn)
+{
+ Transaction::shared_ptr t;
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(&txn);
+ if (ctx)
+ t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(&txn);
+ if (tctx == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ t = tctx->getTransaction();
+ }
+ t->commit(messages);
+}
+
+void
+MSSqlClfsProvider::abort(qpid::broker::TransactionContext& txn)
+{
+ Transaction::shared_ptr t;
+ TransactionContext *ctx = dynamic_cast<TransactionContext*>(&txn);
+ if (ctx)
+ t = ctx->getTransaction();
+ else {
+ TPCTransactionContext *tctx;
+ tctx = dynamic_cast<TPCTransactionContext*>(&txn);
+ if (tctx == 0)
+ throw qpid::broker::InvalidTransactionContextException();
+ t = tctx->getTransaction();
+ }
+ t->abort(messages);
+}
+
+void
+MSSqlClfsProvider::collectPreparedXids(std::set<std::string>& xids)
+{
+ std::map<std::string, TPCTransaction::shared_ptr> preparedMap;
+ transactions->collectPreparedXids(preparedMap);
+ std::map<std::string, TPCTransaction::shared_ptr>::const_iterator i;
+ for (i = preparedMap.begin(); i != preparedMap.end(); ++i) {
+ xids.insert(i->first);
+ }
+}
+
+// @TODO Much of this recovery code is way too similar... refactor to
+// a recover template method on BlobRecordset.
+
+void
+MSSqlClfsProvider::recoverConfigs(qpid::broker::RecoveryManager& recoverer)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsConfigs;
+ rsConfigs.open(db, TblConfig);
+ _RecordsetPtr p = (_RecordsetPtr)rsConfigs;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Config instance and reset its ID.
+ broker::RecoverableConfig::shared_ptr config =
+ recoverer.recoverConfig(blob);
+ config->setPersistenceId(id);
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlClfsProvider::recoverExchanges(qpid::broker::RecoveryManager& recoverer,
+ ExchangeMap& exchangeMap)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsExchanges;
+ rsExchanges.open(db, TblExchange);
+ _RecordsetPtr p = (_RecordsetPtr)rsExchanges;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Exchange instance, reset its ID, and remember the
+ // ones restored for matching up when recovering bindings.
+ broker::RecoverableExchange::shared_ptr exchange =
+ recoverer.recoverExchange(blob);
+ exchange->setPersistenceId(id);
+ exchangeMap[id] = exchange;
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlClfsProvider::recoverQueues(qpid::broker::RecoveryManager& recoverer,
+ QueueMap& queueMap)
+{
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ rsQueues.open(db, TblQueue);
+ _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+ if (p->BOF && p->EndOfFile)
+ return; // Nothing to do
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ long blobSize = p->Fields->Item["fieldTableBlob"]->ActualSize;
+ BlobAdapter blob(blobSize);
+ blob = p->Fields->Item["fieldTableBlob"]->GetChunk(blobSize);
+ // Recreate the Queue instance and reset its ID.
+ broker::RecoverableQueue::shared_ptr queue =
+ recoverer.recoverQueue(blob);
+ queue->setPersistenceId(id);
+ queueMap[id] = queue;
+ p->MoveNext();
+ }
+}
+
+void
+MSSqlClfsProvider::recoverBindings(qpid::broker::RecoveryManager& recoverer,
+ const ExchangeMap& exchangeMap,
+ const QueueMap& queueMap)
+{
+ DatabaseConnection *db = initConnection();
+ BindingRecordset rsBindings;
+ rsBindings.open(db, TblBinding);
+ rsBindings.recover(recoverer, exchangeMap, queueMap);
+}
+
+void
+MSSqlClfsProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer,
+ MessageMap& messageMap,
+ MessageQueueMap& messageQueueMap)
+{
+ // Read the list of valid queue Ids to ensure that no broken msg->queue
+ // refs get restored.
+ DatabaseConnection *db = initConnection();
+ BlobRecordset rsQueues;
+ rsQueues.open(db, TblQueue);
+ _RecordsetPtr p = (_RecordsetPtr)rsQueues;
+ std::set<uint64_t> validQueues;
+ if (!(p->BOF && p->EndOfFile)) {
+ p->MoveFirst();
+ while (!p->EndOfFile) {
+ uint64_t id = p->Fields->Item["persistenceId"]->Value;
+ validQueues.insert(id);
+ p->MoveNext();
+ }
+ }
+ std::map<uint64_t, Transaction::shared_ptr> transMap;
+ transactions->recover(transMap);
+ messages.recover(recoverer,
+ validQueues,
+ transMap,
+ messageMap,
+ messageQueueMap);
+}
+
+void
+MSSqlClfsProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer,
+ PreparedTransactionMap& dtxMap)
+{
+ std::map<std::string, TPCTransaction::shared_ptr> preparedMap;
+ transactions->collectPreparedXids(preparedMap);
+ std::map<std::string, TPCTransaction::shared_ptr>::const_iterator i;
+ for (i = preparedMap.begin(); i != preparedMap.end(); ++i) {
+ std::auto_ptr<TPCTransactionContext> ctx(new TPCTransactionContext(i->second));
+ std::auto_ptr<qpid::broker::TPCTransactionContext> brokerCtx(ctx);
+ dtxMap[i->first] = recoverer.recoverTransaction(i->first, brokerCtx);
+ }
+}
+
+////////////// Internal Methods
+
+State *
+MSSqlClfsProvider::initState()
+{
+ State *state = dbState.get(); // See if thread has initialized
+ if (!state) {
+ state = new State;
+ dbState.reset(state);
+ }
+ return state;
+}
+
+DatabaseConnection *
+MSSqlClfsProvider::initConnection(void)
+{
+ State *state = initState();
+ if (state->dbConn != 0)
+ return state->dbConn; // And the DatabaseConnection is set up too
+ std::auto_ptr<DatabaseConnection> db(new DatabaseConnection);
+ db->open(options.connectString, options.catalogName);
+ state->dbConn = db.release();
+ return state->dbConn;
+}
+
+void
+MSSqlClfsProvider::createDb(DatabaseConnection *db, const std::string &name)
+{
+ const std::string dbCmd = "CREATE DATABASE " + name;
+ const std::string useCmd = "USE " + name;
+ const std::string tableCmd = "CREATE TABLE ";
+ const std::string colSpecs =
+ " (persistenceId bigint PRIMARY KEY NOT NULL IDENTITY(1,1),"
+ " fieldTableBlob varbinary(MAX) NOT NULL)";
+ const std::string bindingSpecs =
+ " (exchangeId bigint REFERENCES tblExchange(persistenceId) NOT NULL,"
+ " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL,"
+ " routingKey varchar(255),"
+ " fieldTableBlob varbinary(MAX))";
+
+ _variant_t unused;
+ _bstr_t dbStr = dbCmd.c_str();
+ _ConnectionPtr conn(*db);
+ try {
+ conn->Execute(dbStr, &unused, adExecuteNoRecords);
+ _bstr_t useStr = useCmd.c_str();
+ conn->Execute(useStr, &unused, adExecuteNoRecords);
+ std::string makeTable = tableCmd + TblQueue + colSpecs;
+ _bstr_t makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblExchange + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblConfig + colSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ makeTable = tableCmd + TblBinding + bindingSpecs;
+ makeTableStr = makeTable.c_str();
+ conn->Execute(makeTableStr, &unused, adExecuteNoRecords);
+ }
+ catch(_com_error &e) {
+ throw ADOException("MSSQL can't create " + name, e, db->getErrors());
+ }
+}
+
+void
+MSSqlClfsProvider::dump()
+{
+ // dump all db records to qpid_log
+ QPID_LOG(notice, "DB Dump: (not dumping anything)");
+ // rsQueues.dump();
+}
+
+
+}}} // namespace qpid::store::ms_sql
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
new file mode 100644
index 0000000000..14d63a4cd4
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp
@@ -0,0 +1,406 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <exception>
+#include <malloc.h>
+#include <memory.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/windows/check.h>
+
+#include "MessageLog.h"
+#include "Lsn.h"
+
+namespace {
+
+// Structures that hold log records. Each has a type field at the start.
+enum MessageEntryType {
+ MessageStartEntry = 1,
+ MessageChunkEntry = 2,
+ MessageDeleteEntry = 3,
+ MessageEnqueueEntry = 4,
+ MessageDequeueEntry = 5
+};
+static const uint32_t MaxMessageContentLength = 64 * 1024;
+
+// Message-Start
+struct MessageStart {
+ MessageEntryType type;
+ // If the complete message encoding doesn't fit, remainder is in
+ // MessageChunk records to follow.
+ // headerLength is the size of the message's header in content. It is
+ // part of the totalLength and the segmentLength.
+ uint32_t headerLength;
+ uint32_t totalLength;
+ uint32_t segmentLength;
+ char content[MaxMessageContentLength];
+
+ MessageStart()
+ : type(MessageStartEntry),
+ headerLength(0),
+ totalLength(0),
+ segmentLength(0) {}
+};
+// Message-Chunk
+struct MessageChunk {
+ MessageEntryType type;
+ uint32_t segmentLength;
+ char content[MaxMessageContentLength];
+
+ MessageChunk() : type(MessageChunkEntry), segmentLength(0) {}
+};
+// Message-Delete
+struct MessageDelete {
+ MessageEntryType type;
+
+ MessageDelete() : type(MessageDeleteEntry) {}
+};
+// Message-Enqueue
+struct MessageEnqueue {
+ MessageEntryType type;
+ uint64_t queueId;
+ uint64_t transId;
+
+ MessageEnqueue(uint64_t qId = 0, uint64_t tId = 0)
+ : type(MessageEnqueueEntry), queueId(qId), transId(tId) {}
+};
+// Message-Dequeue
+struct MessageDequeue {
+ MessageEntryType type;
+ uint64_t queueId;
+ uint64_t transId;
+
+ MessageDequeue(uint64_t qId = 0, uint64_t tId = 0)
+ : type(MessageDequeueEntry), queueId(qId), transId(tId) {}
+};
+
+} // namespace
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+void
+MessageLog::initialize()
+{
+ // Write something to occupy the first record, preventing a real message
+ // from being lsn/id 0. Delete of a non-existant id is easily tossed
+ // during recovery if no other messages have caused the tail to be moved
+ // up past this dummy record by then.
+ deleteMessage(0, 0);
+}
+
+uint32_t
+MessageLog::marshallingBufferSize()
+{
+ size_t biggestNeed = std::max(sizeof(MessageStart), sizeof(MessageEnqueue));
+ uint32_t defSize = static_cast<uint32_t>(biggestNeed);
+ uint32_t minSize = Log::marshallingBufferSize();
+ if (defSize <= minSize)
+ return minSize;
+ // Round up to multiple of minSize
+ return (defSize + minSize) / minSize * minSize;
+}
+
+uint64_t
+MessageLog::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+ // The message may be too long to fit in one record; if so, write
+ // Message-Chunk records to contain the rest. If it does all fit in one
+ // record, though, optimize the encoding by going straight to the
+ // Message-Start record rather than encoding then copying to the record.
+ // In all case
+ MessageStart entry;
+ uint32_t encodedMessageLength = msg->encodedSize();
+ entry.headerLength = msg->encodedHeaderSize();
+ entry.totalLength = encodedMessageLength;
+ CLFS_LSN location, lastChunkLsn;
+ std::auto_ptr<char> encodeStage;
+ char *encodeBuff = 0;
+ bool oneRecord = encodedMessageLength <= MaxMessageContentLength;
+ if (oneRecord) {
+ encodeBuff = entry.content;
+ entry.segmentLength = encodedMessageLength;
+ }
+ else {
+ encodeStage.reset(new char[encodedMessageLength]);
+ encodeBuff = encodeStage.get();
+ entry.segmentLength = MaxMessageContentLength;
+ }
+ qpid::framing::Buffer buff(encodeBuff, encodedMessageLength);
+ msg->encode(buff);
+ if (!oneRecord)
+ memcpy_s(entry.content, sizeof(entry.content),
+ encodeBuff, entry.segmentLength);
+ uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+ entryLength -= (MaxMessageContentLength - entry.segmentLength);
+ location = write(&entry, entryLength);
+ // Write any Message-Chunk records before setting the message's id.
+ uint32_t sent = entry.segmentLength;
+ uint32_t remaining = encodedMessageLength - entry.segmentLength;
+ while (remaining > 0) {
+ MessageChunk chunk;
+ chunk.segmentLength = std::max(MaxMessageContentLength, remaining);
+ memcpy_s(chunk.content, sizeof(chunk.content),
+ encodeStage.get() + sent, chunk.segmentLength);
+ entryLength = static_cast<uint32_t>(sizeof(chunk));
+ entryLength -= (MaxMessageContentLength - chunk.segmentLength);
+ lastChunkLsn = write(&chunk, entryLength, &location);
+ sent += chunk.segmentLength;
+ remaining -= chunk.segmentLength;
+ }
+ return lsnToId(location);
+}
+
+void
+MessageLog::deleteMessage(uint64_t messageId, uint64_t newFirstId)
+{
+ MessageDelete deleteEntry;
+ CLFS_LSN msgLsn = idToLsn(messageId);
+ write(&deleteEntry, sizeof(deleteEntry), &msgLsn);
+ if (newFirstId != 0)
+ moveTail(idToLsn(newFirstId));
+}
+
+// Load part or all of a message's content from previously stored
+// log record(s).
+void
+MessageLog::loadContent(uint64_t messageId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+}
+
+void
+MessageLog::recordEnqueue (uint64_t messageId,
+ uint64_t queueId,
+ uint64_t transactionId)
+{
+ MessageEnqueue entry(queueId, transactionId);
+ CLFS_LSN msgLsn = idToLsn(messageId);
+ write(&entry, sizeof(entry), &msgLsn);
+}
+
+void
+MessageLog::recordDequeue (uint64_t messageId,
+ uint64_t queueId,
+ uint64_t transactionId)
+{
+ MessageDequeue entry(queueId, transactionId);
+ CLFS_LSN msgLsn = idToLsn(messageId);
+ write(&entry, sizeof(entry), &msgLsn);
+}
+
+void
+MessageLog::recover(qpid::broker::RecoveryManager& recoverer,
+ qpid::store::MessageMap& messageMap,
+ std::map<uint64_t, std::vector<RecoveredMsgOp> >& messageOps)
+{
+ // If context and content needs to be saved while reassembling messages
+ // split across log records, save the info and reassembly buffer.
+ struct MessageBlocks {
+ uint32_t totalLength;
+ uint32_t soFarLength;
+ boost::shared_ptr<char> content;
+
+ MessageBlocks() : totalLength(0), soFarLength(0), content((char*)0) {}
+ };
+ std::map<uint64_t, MessageBlocks> reassemblies;
+ std::map<uint64_t, MessageBlocks>::iterator at;
+
+ QPID_LOG(debug, "Recovering message log");
+
+ // Note that there may be message refs in the log which are deleted, so
+ // be sure to only add msgs at message-start record, and ignore those
+ // that don't have an existing message record.
+ // Get the base LSN - that's how to say "start reading at the beginning"
+ CLFS_INFORMATION info;
+ ULONG infoLength = sizeof (info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Pointers for the various record types that can be assigned in the
+ // reading loop below.
+ MessageStart *start;
+ MessageChunk *chunk;
+ MessageEnqueue *enqueue;
+ MessageDequeue *dequeue;
+
+ qpid::store::MessageMap::iterator messageMapSpot;
+ qpid::store::MessageQueueMap::iterator queueMapSpot;
+ PVOID recordPointer;
+ ULONG recordLength;
+ CLFS_RECORD_TYPE recordType = ClfsDataRecord;
+ CLFS_LSN messageLsn, current, undoNext;
+ PVOID readContext;
+ uint64_t msgId;
+ // Note 'current' in case it's needed below; ReadNextLogRecord returns it
+ // via a parameter.
+ current = info.BaseLsn;
+ ok = ::ReadLogRecord(marshal,
+ &info.BaseLsn,
+ ClfsContextForward,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ &undoNext,
+ &messageLsn,
+ &readContext,
+ 0);
+ while (ok) {
+ // All the record types this class writes have a MessageEntryType in the
+ // beginning. Based on that, do what's needed.
+ MessageEntryType *t =
+ reinterpret_cast<MessageEntryType *>(recordPointer);
+ switch(*t) {
+ case MessageStartEntry:
+ start = reinterpret_cast<MessageStart *>(recordPointer);
+ msgId = lsnToId(current);
+ QPID_LOG(debug, "Message Start, id " << msgId);
+ // If the message content is split across multiple log records, save
+ // this content off to the side until the remaining record(s) are
+ // located.
+ if (start->totalLength == start->segmentLength) { // Whole thing
+ // Start by recovering the header then see if the rest of
+ // the content is desired.
+ qpid::framing::Buffer buff(start->content, start->headerLength);
+ qpid::broker::RecoverableMessage::shared_ptr m =
+ recoverer.recoverMessage(buff);
+ m->setPersistenceId(msgId);
+ messageMap[msgId] = m;
+ uint32_t contentLength =
+ start->totalLength - start->headerLength;
+ if (m->loadContent(contentLength)) {
+ qpid::framing::Buffer content(&(start->content[start->headerLength]),
+ contentLength);
+ m->decodeContent(content);
+ }
+ }
+ else {
+ // Save it in a block big enough.
+ MessageBlocks b;
+ b.totalLength = start->totalLength;
+ b.soFarLength = start->segmentLength;
+ b.content.reset(new char[b.totalLength]);
+ memcpy_s(b.content.get(), b.totalLength,
+ start->content, start->segmentLength);
+ reassemblies[msgId] = b;
+ }
+ break;
+ case MessageChunkEntry:
+ chunk = reinterpret_cast<MessageChunk *>(recordPointer);
+ // Remember, all entries chained to MessageStart via previous.
+ msgId = lsnToId(messageLsn);
+ QPID_LOG(debug, "Message Chunk for id " << msgId);
+ at = reassemblies.find(msgId);
+ if (at == reassemblies.end()) {
+ QPID_LOG(debug, "Message frag for " << msgId <<
+ " but no start; discarded");
+ }
+ else {
+ MessageBlocks *b = &(at->second);
+ if (b->soFarLength + chunk->segmentLength > b->totalLength)
+ throw std::runtime_error("Invalid message chunk length");
+ memcpy_s(b->content.get() + b->soFarLength,
+ b->totalLength - b->soFarLength,
+ chunk->content,
+ chunk->segmentLength);
+ b->soFarLength += chunk->segmentLength;
+ if (b->totalLength == b->soFarLength) {
+ qpid::framing::Buffer buff(b->content.get(),
+ b->totalLength);
+ qpid::broker::RecoverableMessage::shared_ptr m =
+ recoverer.recoverMessage(buff);
+ m->setPersistenceId(msgId);
+ messageMap[msgId] = m;
+ reassemblies.erase(at);
+ }
+ }
+ break;
+ case MessageDeleteEntry:
+ msgId = lsnToId(messageLsn);
+ QPID_LOG(debug, "Message Delete, id " << msgId);
+ messageMap.erase(msgId);
+ messageOps.erase(msgId);
+ break;
+ case MessageEnqueueEntry:
+ enqueue = reinterpret_cast<MessageEnqueue *>(recordPointer);
+ msgId = lsnToId(messageLsn);
+ QPID_LOG(debug, "Message " << msgId << " Enqueue on queue " <<
+ enqueue->queueId << ", txn " << enqueue->transId);
+ if (messageMap.find(msgId) == messageMap.end()) {
+ QPID_LOG(debug,
+ "Message " << msgId << " doesn't exist; discarded");
+ }
+ else {
+ std::vector<RecoveredMsgOp>& ops = messageOps[msgId];
+ RecoveredMsgOp op(RECOVERED_ENQUEUE,
+ enqueue->queueId,
+ enqueue->transId);
+ ops.push_back(op);
+ }
+ break;
+ case MessageDequeueEntry:
+ dequeue = reinterpret_cast<MessageDequeue *>(recordPointer);
+ msgId = lsnToId(messageLsn);
+ QPID_LOG(debug, "Message " << msgId << " Dequeue from queue " <<
+ dequeue->queueId);
+ if (messageMap.find(msgId) == messageMap.end()) {
+ QPID_LOG(debug,
+ "Message " << msgId << " doesn't exist; discarded");
+ }
+ else {
+ std::vector<RecoveredMsgOp>& ops = messageOps[msgId];
+ RecoveredMsgOp op(RECOVERED_DEQUEUE,
+ dequeue->queueId,
+ dequeue->transId);
+ ops.push_back(op);
+ }
+ break;
+ default:
+ throw std::runtime_error("Bad message log entry type");
+ }
+
+ recordType = ClfsDataRecord;
+ ok = ::ReadNextLogRecord(readContext,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ 0, // No userLsn
+ &undoNext,
+ &messageLsn,
+ &current,
+ 0);
+ }
+ DWORD status = ::GetLastError();
+ ::TerminateReadLog(readContext);
+ if (status == ERROR_HANDLE_EOF) { // No more records
+ QPID_LOG(debug, "Message log recovered");
+ return;
+ }
+ throw QPID_WINDOWS_ERROR(status);
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
new file mode 100644
index 0000000000..b3705287a6
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.h
@@ -0,0 +1,107 @@
+#ifndef QPID_STORE_MSCLFS_MESSAGELOG_H
+#define QPID_STORE_MSCLFS_MESSAGELOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/broker/RecoveryManager.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/store/StorageProvider.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+/**
+ * @class MessageLog
+ *
+ * Represents a CLFS-housed message log.
+ */
+class MessageLog : public Log {
+
+protected:
+ // Message log needs to have a no-op first record written in the log
+ // to ensure that no real message gets an ID 0.
+ virtual void initialize();
+
+public:
+ // Inherited and reimplemented from Log. Figure the minimum marshalling
+ // buffer size needed for the records this class writes.
+ virtual uint32_t marshallingBufferSize();
+
+ // Add the specified message to the log; Return the persistence Id.
+ uint64_t add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+ // Write a Delete entry for messageId. If newFirstId is not 0, it is now
+ // the earliest valid message in the log, so move the tail up to it.
+ void deleteMessage(uint64_t messageId, uint64_t newFirstId);
+
+ // Load part or all of a message's content from previously stored
+ // log record(s).
+ void loadContent(uint64_t messageId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ // Enqueue and dequeue operations track messages' transit across
+ // queues; each operation may be associated with a transaction. If
+ // the transactionId is 0 the operation is not associated with a
+ // transaction.
+ void recordEnqueue (uint64_t messageId,
+ uint64_t queueId,
+ uint64_t transactionId);
+ void recordDequeue (uint64_t messageId,
+ uint64_t queueId,
+ uint64_t transactionId);
+
+ // Recover the messages and their queueing records from the log.
+ // @param recoverer Recovery manager used to recreate broker objects from
+ // encoded framing buffers recovered from the log.
+ // @param messageMap This method fills in the map of id -> ptr of
+ // recovered messages.
+ // @param messageOps This method fills in the map of msg id ->
+ // vector of operations involving the message that were
+ // recovered from the log. It is the caller's
+ // responsibility to sort the operations out and
+ // ascertain which operations should be acted on. The
+ // order of operations in the vector is as they were
+ // read in order from the log.
+ typedef enum { RECOVERED_ENQUEUE = 1, RECOVERED_DEQUEUE } RecoveredOpType;
+ struct RecoveredMsgOp {
+ RecoveredOpType op;
+ uint64_t queueId;
+ uint64_t txnId;
+
+ RecoveredMsgOp(RecoveredOpType o, const uint64_t& q, const uint64_t& t)
+ : op(o), queueId(q), txnId(t) {}
+ };
+ void recover(qpid::broker::RecoveryManager& recoverer,
+ qpid::store::MessageMap& messageMap,
+ std::map<uint64_t, std::vector<RecoveredMsgOp> >& messageOps);
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_MESSAGELOG_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
new file mode 100644
index 0000000000..db5d2ebf4c
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.cpp
@@ -0,0 +1,472 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/log/Statement.h>
+
+#include "Messages.h"
+#include "Lsn.h"
+#include "qpid/store/StoreException.h"
+#include <boost/foreach.hpp>
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+void
+Messages::openLog(const std::string& path, const Log::TuningParameters& params)
+{
+ log.open (path, params);
+}
+
+void
+Messages::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+{
+ uint64_t id = log.add(msg);
+ msg->setPersistenceId(id);
+ std::auto_ptr<MessageInfo> autom(new MessageInfo);
+ MessageInfo::shared_ptr m(autom);
+ std::pair<uint64_t, MessageInfo::shared_ptr> p(id, m);
+ {
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+ messages.insert(p);
+ // If there's only this one message there, move the tail to it.
+ // This prevents the log from continually growing when messages
+ // are added and removed one at a time.
+ if (messages.size() == 1) {
+ CLFS_LSN newTail = idToLsn(id);
+ log.moveTail(newTail);
+ }
+ }
+}
+
+void
+Messages::enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t)
+{
+ MessageInfo::shared_ptr p;
+ {
+ qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+ MessageMap::const_iterator i = messages.find(msgId);
+ if (i == messages.end())
+ THROW_STORE_EXCEPTION("Message does not exist");
+ p = i->second;
+ }
+ MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ p->where.push_back(loc);
+ uint64_t transactionId = 0;
+ if (t.get() != 0) {
+ transactionId = t->getId();
+ t->enroll(msgId);
+ }
+ try {
+ log.recordEnqueue(msgId, queueId, transactionId);
+ }
+ catch (...) {
+ // Undo the record-keeping if the log wasn't written correctly.
+ if (transactionId != 0)
+ t->unenroll(msgId);
+ p->where.pop_back();
+ throw;
+ }
+ }
+}
+
+void
+Messages::dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t)
+{
+ MessageInfo::shared_ptr p;
+ {
+ qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+ MessageMap::const_iterator i = messages.find(msgId);
+ if (i == messages.end())
+ THROW_STORE_EXCEPTION("Message does not exist");
+ p = i->second;
+ }
+ {
+ // Locate the 'where' entry for the specified queue. Once this operation
+ // is recorded in the log, update the 'where' entry to reflect it.
+ // Note that an existing entry in 'where' that refers to a transaction
+ // is not eligible for this operation.
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i;
+ for (i = p->where.begin(); i != p->where.end(); ++i) {
+ if (i->queueId == queueId && i->transaction.get() == 0)
+ break;
+ }
+ if (i == p->where.end())
+ THROW_STORE_EXCEPTION("Message not on queue");
+ uint64_t transactionId = 0;
+ if (t.get() != 0) {
+ transactionId = t->getId();
+ t->enroll(msgId);
+ }
+ try {
+ log.recordDequeue(msgId, queueId, transactionId);
+ }
+ catch (...) {
+ // Undo the record-keeping if the log wasn't written correctly.
+ if (transactionId != 0)
+ t->unenroll(msgId);
+ throw;
+ }
+ // Ok, logged successfully. If this is a transactional op, note
+ // the transaction. If non-transactional, remove the 'where' entry.
+ if (transactionId != 0) {
+ i->transaction = t;
+ i->disposition = MessageInfo::TRANSACTION_DEQUEUE;
+ }
+ else {
+ p->where.erase(i);
+ // If the message doesn't exist on any other queues, remove it.
+ if (p->where.empty())
+ remove(msgId);
+ }
+ }
+}
+
+// Commit a previous provisional enqueue or dequeue of a particular message
+// actions under a specified transaction. If this results in the message's
+// being removed from all queues, it is deleted.
+void
+Messages::commit(uint64_t msgId, Transaction::shared_ptr& t)
+{
+ MessageInfo::shared_ptr p;
+ {
+ qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+ MessageMap::const_iterator i = messages.find(msgId);
+ if (i == messages.end())
+ THROW_STORE_EXCEPTION("Message does not exist");
+ p = i->second;
+ }
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i;
+ for (i = p->where.begin(); i != p->where.end(); ++i) {
+ if (i->transaction != t)
+ continue;
+ // Transactional dequeues can now remove the item from the
+ // where list; enqueues just clear the transaction reference.
+ if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE)
+ i = p->where.erase(i);
+ else
+ i->transaction.reset();
+ }
+ }
+ // If committing results in this message having no further enqueue
+ // references, delete it. If the delete fails, swallow the exception
+ // and let recovery take care of removing it later.
+ if (p->where.empty()) {
+ try {
+ remove(msgId);
+ }
+ catch(...) {}
+ }
+}
+
+// Abort a previous provisional enqueue or dequeue of a particular message
+// actions under a specified transaction. If this results in the message's
+// being removed from all queues, it is deleted.
+void
+Messages::abort(uint64_t msgId, Transaction::shared_ptr& t)
+{
+ MessageInfo::shared_ptr p;
+ {
+ qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
+ MessageMap::const_iterator i = messages.find(msgId);
+ if (i == messages.end())
+ THROW_STORE_EXCEPTION("Message does not exist");
+ p = i->second;
+ }
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i = p->where.begin();
+ while (i != p->where.end()) {
+ if (i->transaction != t) {
+ ++i;
+ continue;
+ }
+ // Aborted transactional dequeues result in the message remaining
+ // enqueued like before the operation; enqueues clear the
+ // message from the where list - like the enqueue never happened.
+ if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE)
+ i = p->where.erase(i);
+ else {
+ i->transaction.reset();
+ ++i;
+ }
+ }
+ }
+ // If aborting results in this message having no further enqueue
+ // references, delete it. If the delete fails, swallow the exception
+ // and let recovery take care of removing it later.
+ if (p->where.empty()) {
+ try {
+ remove(msgId);
+ }
+ catch(...) {}
+ }
+}
+
+// Load part or all of a message's content from previously stored
+// log record(s).
+void
+Messages::loadContent(uint64_t msgId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length)
+{
+ log.loadContent(msgId, data, offset, length);
+}
+
+// Recover the current set of messages and where they're queued from
+// the log.
+void
+Messages::recover(qpid::broker::RecoveryManager& recoverer,
+ const std::set<uint64_t> &validQueues,
+ const std::map<uint64_t, Transaction::shared_ptr>& transMap,
+ qpid::store::MessageMap& messageMap,
+ qpid::store::MessageQueueMap& messageQueueMap)
+{
+ std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps;
+ log.recover(recoverer, messageMap, messageOps);
+ // Now read through the messageOps replaying the operations with the
+ // knowledge of which transactions committed, aborted, etc. A transaction
+ // should not be deleted until there are no messages referencing it so
+ // a message operation with a transaction id not found in transMap is
+ // a serious problem.
+ QPID_LOG(debug, "Beginning CLFS-recovered message operation replay");
+ // Keep track of any messages that are recovered from the log but don't
+ // have any place to be. This can happen, for example, if the broker
+ // crashes while logging a message deletion. After all the recovery is
+ // done, delete all the homeless messages.
+ std::vector<uint64_t> homeless;
+ std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> >::const_iterator msg;
+ for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) {
+ uint64_t msgId = msg->first;
+ const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second;
+ QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)");
+ MessageInfo::shared_ptr m(new MessageInfo);
+ std::vector<QueueEntry>& entries = messageQueueMap[msgId];
+ std::vector<MessageLog::RecoveredMsgOp>::const_iterator op;
+ for (op = ops.begin(); op != ops.end(); ++op) {
+ QueueEntry entry(op->queueId);
+ MessageInfo::Location loc(op->queueId);
+ std::string dir =
+ op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue"
+ : "dequeue";
+ if (validQueues.find(op->queueId) == validQueues.end()) {
+ QPID_LOG(info,
+ "Message " << msgId << dir << " on non-existant queue "
+ << op->queueId << "; dropped");
+ continue;
+ }
+ if (op->txnId != 0) {
+ // Be sure to enroll this message in the transaction even if
+ // it has committed or aborted. This ensures that the
+ // transaction isn't removed from the log while finalizing the
+ // recovery. If it were to be removed and the broker failed
+ // again before removing this message during normal operation,
+ // it couldn't be recovered again.
+ //
+ // Recall what is being reconstructed; 2 things:
+ // 1. This class's 'messages' list which keeps track
+ // of the queues each message is on and the transactions
+ // each message is enrolled in. For this, aborted
+ // transactions cause the result of the operation to be
+ // ignored, but the message does need to be enrolled in
+ // the transaction to properly maintain the transaction
+ // references until the message is deleted.
+ // 2. The StorageProvider's MessageQueueMap, which also
+ // has an entry for each queue each message is on and
+ // its TPL status and associated xid.
+ const Transaction::shared_ptr &t =
+ transMap.find(op->txnId)->second;
+ // Prepared transactions cause the operation to be
+ // provisionally acted on, and the message to be enrolled in
+ // the transaction for when it commits/aborts. This is
+ // noted in the QueueEntry for the StorageProvider's map.
+ if (t->getState() == Transaction::TRANS_PREPARED) {
+ QPID_LOG(debug, dir << " for queue " << op->queueId <<
+ ", prepared txn " << op->txnId);
+ TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t));
+ if (tpct.get() == 0)
+ THROW_STORE_EXCEPTION("Invalid transaction state");
+ t->enroll(msgId);
+ entry.xid = tpct->getXid();
+ loc.transaction = t;
+ if (op->op == MessageLog::RECOVERED_ENQUEUE) {
+ entry.tplStatus = QueueEntry::ADDING;
+ loc.disposition = MessageInfo::TRANSACTION_ENQUEUE;
+ }
+ else {
+ entry.tplStatus = QueueEntry::REMOVING;
+ loc.disposition = MessageInfo::TRANSACTION_DEQUEUE;
+ }
+ }
+ else if (t->getState() != Transaction::TRANS_COMMITTED) {
+ QPID_LOG(debug, dir << " for queue " << op->queueId <<
+ ", txn " << op->txnId << ", rolling back");
+ continue;
+ }
+ }
+ // Here for non-transactional and prepared transactional operations
+ // to set up the messageQueueMap entries. Note that at this point
+ // a committed transactional operation looks like a
+ // non-transactional one as far as the QueueEntry is
+ // concerned - just do it. If this is an entry enqueuing a
+ // message, just add it to the entries list. If it's a dequeue
+ // operation, locate the matching entry for the queue and delete
+ // it if the current op is non-transactional; if it's a prepared
+ // transaction then replace the existing entry with the current
+ // one that notes the message is enqueued but being removed under
+ // a prepared transaction.
+ QPID_LOG(debug, dir + " at queue " << entry.queueId);
+ if (op->op == MessageLog::RECOVERED_ENQUEUE) {
+ entries.push_back(entry);
+ m->where.push_back(loc);
+ }
+ else {
+ std::vector<QueueEntry>::iterator i = entries.begin();
+ while (i != entries.end()) {
+ if (i->queueId == entry.queueId) {
+ if (entry.tplStatus != QueueEntry::NONE)
+ *i = entry;
+ else
+ entries.erase(i);
+ break;
+ }
+ ++i;
+ }
+ std::list<MessageInfo::Location>::iterator w = m->where.begin();
+ while (w != m->where.end()) {
+ if (w->queueId == loc.queueId) {
+ if (loc.transaction.get() != 0) {
+ *w = loc;
+ ++w;
+ }
+ else {
+ w = m->where.erase(w);
+ }
+ }
+ }
+ }
+ }
+ // Now that all the queue entries have been set correctly, see if
+ // there are any entries; they may have all been removed during
+ // recovery. If there are none, add this message to the homeless
+ // list to be deleted from the log after the recovery is done.
+ if (m->where.size() == 0) {
+ homeless.push_back(msgId);
+ messageMap.erase(msgId);
+ messageQueueMap.erase(msgId);
+ }
+ else {
+ std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m);
+ messages.insert(p);
+ }
+ }
+
+ QPID_LOG(debug, "Message log recovery done.");
+ // Done! Ok, go back and delete all the homeless messages.
+ BOOST_FOREACH(uint64_t msg, homeless) {
+ QPID_LOG(debug, "Deleting homeless message " << msg);
+ remove(msg);
+ }
+}
+
+// Expunge is called when a queue is deleted. All references to that
+// queue must be expunged from all messages. 'Dequeue' log records are
+// written for each queue entry removed, but any errors are swallowed.
+// On recovery there's a list of valid queues passed in. The deleted
+// queue will not be on that list so if any references to it are
+// recovered they'll get weeded out then.
+void
+Messages::expunge(uint64_t queueId)
+{
+ std::vector<uint64_t> toBeDeleted; // Messages to be deleted later.
+
+ {
+ // Lock everybody out since all messages are possibly in play.
+ // There also may be other threads already working on particular
+ // messages so individual message mutex still must be acquired.
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+ MessageMap::iterator m;
+ for (m = messages.begin(); m != messages.end(); ++m) {
+ MessageInfo::shared_ptr p = m->second;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> ml(p->whereLock);
+ std::list<MessageInfo::Location>::iterator i = p->where.begin();
+ while (i != p->where.end()) {
+ if (i->queueId != queueId) {
+ ++i;
+ continue;
+ }
+ // If this entry is involved in a transaction, unenroll it.
+ // Then remove the entry.
+ if (i->transaction.get() != 0)
+ i->transaction->unenroll(m->first);
+ i = p->where.erase(i);
+ try {
+ log.recordDequeue(m->first, queueId, 0);
+ }
+ catch(...) {
+ }
+ }
+ if (p->where.size() == 0)
+ toBeDeleted.push_back(m->first);
+ }
+ }
+ }
+ // Swallow any exceptions during this; don't care. Recover it later
+ // if needed.
+ try {
+ BOOST_FOREACH(uint64_t msg, toBeDeleted)
+ remove(msg);
+ }
+ catch(...) {
+ }
+}
+
+// Remove a specified message from those controlled by this object.
+void
+Messages::remove(uint64_t messageId)
+{
+ uint64_t newFirstId = 0;
+ {
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
+ messages.erase(messageId);
+ // May have deleted the first entry; if so the log can release that.
+ // If this message being deleted results in an empty list of
+ // messages, move the tail up to this message's LSN. This may
+ // result in one or more messages being stranded in the log
+ // until there's more activity. If a restart happens while these
+ // unneeded log records are there, the presence of the MessageDelete
+ // entry will cause the message(s) to be ignored anyway.
+ if (messages.empty())
+ newFirstId = messageId;
+ else if (messages.begin()->first > messageId)
+ newFirstId = messages.begin()->first;
+ }
+ log.deleteMessage(messageId, newFirstId);
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Messages.h b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
new file mode 100644
index 0000000000..93cc8bfe62
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Messages.h
@@ -0,0 +1,144 @@
+#ifndef QPID_STORE_MSCLFS_MESSAGES_H
+#define QPID_STORE_MSCLFS_MESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <map>
+#include <set>
+#include <vector>
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
+#include <qpid/sys/Mutex.h>
+
+#include "MessageLog.h"
+#include "Transaction.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Messages {
+
+ struct MessageInfo {
+ // How many queues this message is on, whether actually (non-transacted)
+ // or provisionally (included in a non-yet-committed transaction).
+ volatile LONG enqueuedCount;
+
+ // Keep a list of transactional operations this message is
+ // referenced in. When the transaction changes/finalizes these all
+ // need to be acted on.
+ typedef enum { TRANSACTION_NONE = 0,
+ TRANSACTION_ENQUEUE,
+ TRANSACTION_DEQUEUE } TransType;
+#if 0
+ std::map<Transaction::shared_ptr, std::vector<TransType> > transOps;
+ qpid::sys::Mutex transOpsLock;
+#endif
+ // Think what I need is a list of "where is this message" - queue id,
+ // transaction ref, what kind of trans op (enq/deq). Then "remove all
+ // queue refs" can search through all messages looking for queue ids
+ // and undo them. Write "remove from queue" record to log. Also need to
+ // add "remove from queue" to recovery.
+ struct Location {
+ uint64_t queueId;
+ Transaction::shared_ptr transaction;
+ TransType disposition;
+
+ Location(uint64_t q)
+ : queueId(q), transaction(), disposition(TRANSACTION_NONE) {}
+ Location(uint64_t q, Transaction::shared_ptr& t, TransType d)
+ : queueId(q), transaction(t), disposition(d) {}
+ };
+ qpid::sys::Mutex whereLock;
+ std::list<Location> where;
+ // The transactions vector just keeps a shared_ptr to each
+ // Transaction this message was involved in, regardless of the
+ // disposition or transaction state. Keeping a valid shared_ptr
+ // prevents the Transaction from being deleted. As long as there
+ // are any messages that referred to a transaction, that
+ // transaction's state needs to be known so the message disposition
+ // can be correctly recovered if needed.
+ std::vector<Transaction::shared_ptr> transactions;
+
+ typedef boost::shared_ptr<MessageInfo> shared_ptr;
+
+ MessageInfo() : enqueuedCount(0) {}
+ };
+
+ qpid::sys::RWlock lock;
+ typedef std::map<uint64_t, MessageInfo::shared_ptr> MessageMap;
+ MessageMap messages;
+ MessageLog log;
+
+ // Remove a specified message from those controlled by this object.
+ void remove(uint64_t messageId);
+
+public:
+ void openLog(const std::string& path, const Log::TuningParameters& params);
+
+ // Add the specified message to the log and list of known messages.
+ // Upon successful return the message's persistenceId is set.
+ void add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
+ // Add the specified queue to the message's list of places it is
+ // enqueued.
+ void enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t);
+
+ // Remove the specified queue from the message's list of places it is
+ // enqueued. If there are no other queues holding the message, it is
+ // deleted.
+ void dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t);
+
+ // Commit a previous provisional enqueue or dequeue of a particular message
+ // actions under a specified transaction. If this results in the message's
+ // being removed from all queues, it is deleted.
+ void commit(uint64_t msgId, Transaction::shared_ptr& transaction);
+
+ // Abort a previous provisional enqueue or dequeue of a particular message
+ // actions under a specified transaction. If this results in the message's
+ // being removed from all queues, it is deleted.
+ void abort(uint64_t msgId, Transaction::shared_ptr& transaction);
+
+ // Load part or all of a message's content from previously stored
+ // log record(s).
+ void loadContent(uint64_t msgId,
+ std::string& data,
+ uint64_t offset,
+ uint32_t length);
+
+ // Expunge is called when a queue is deleted. All references to that
+ // queue must be expunged from all messages.
+ void expunge(uint64_t queueId);
+
+ // Recover the current set of messages and where they're queued from
+ // the log.
+ void recover(qpid::broker::RecoveryManager& recoverer,
+ const std::set<uint64_t> &validQueues,
+ const std::map<uint64_t, Transaction::shared_ptr>& transMap,
+ qpid::store::MessageMap& messageMap,
+ qpid::store::MessageQueueMap& messageQueueMap);
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_MESSAGES_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp
new file mode 100644
index 0000000000..f94fef6f84
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.cpp
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Transaction.h"
+#include "Messages.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+Transaction::~Transaction()
+{
+ // Transactions that are recovered then found to be deleted get destroyed
+ // but need not be logged.
+ if (state != TRANS_DELETED)
+ log->deleteTransaction(id);
+}
+
+void
+Transaction::enroll(uint64_t msgId)
+{
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(enrollLock);
+ enrolledMessages.push_back(msgId);
+}
+
+void
+Transaction::unenroll(uint64_t msgId)
+{
+ qpid::sys::ScopedWlock<qpid::sys::RWlock> l(enrollLock);
+ for (std::vector<uint64_t>::iterator i = enrolledMessages.begin();
+ i < enrolledMessages.end();
+ ++i) {
+ if (*i == msgId) {
+ enrolledMessages.erase(i);
+ break;
+ }
+ }
+}
+
+void
+Transaction::abort(Messages& messages)
+{
+ log->recordAbort(id);
+ for (size_t i = 0; i < enrolledMessages.size(); ++i)
+ messages.abort(enrolledMessages[i], shared_from_this());
+ state = TRANS_ABORTED;
+}
+
+void
+Transaction::commit(Messages& messages)
+{
+ log->recordCommit(id);
+ for (size_t i = 0; i < enrolledMessages.size(); ++i)
+ messages.commit(enrolledMessages[i], shared_from_this());
+ state = TRANS_COMMITTED;
+}
+
+void
+TPCTransaction::prepare(void)
+{
+ log->recordPrepare(id);
+ state = TRANS_PREPARED;
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h
new file mode 100644
index 0000000000..fd07f2fb2e
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h
@@ -0,0 +1,146 @@
+#ifndef QPID_STORE_MSCLFS_TRANSACTION_H
+#define QPID_STORE_MSCLFS_TRANSACTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/TransactionalStore.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+#include "TransactionLog.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Messages;
+
+/**
+ * @class Transaction
+ *
+ * Class representing an AMQP transaction. This is used around a set of
+ * enqueue and dequeue operations that occur when the broker is acting
+ * on a transaction commit/abort from the client.
+ * This class is what the store uses internally to implement things a
+ * transaction needs; the broker knows about TransactionContext, which
+ * holds a pointer to Transaction.
+ *
+ * NOTE: All references to Transactions (and TPCTransactions, below) are
+ * through Boost shared_ptr instances. All messages enrolled in a transaction
+ * hold a shared_ptr. Thus, a Transaction object will not be deleted until all
+ * messages holding a reference to it are deleted. This fact is also used
+ * during recovery to automatically clean up and delete any Transaction without
+ * messages left referring to it.
+ */
+class Transaction : public boost::enable_shared_from_this<Transaction> {
+private:
+ // TransactionLog has to create all Transaction instances.
+ Transaction() {}
+
+public:
+
+ typedef boost::shared_ptr<Transaction> shared_ptr;
+ typedef enum { TRANS_OPEN = 1,
+ TRANS_PREPARED,
+ TRANS_ABORTED,
+ TRANS_COMMITTED,
+ TRANS_DELETED } State;
+
+ virtual ~Transaction();
+
+ uint64_t getId() { return id; }
+ State getState() { return state; }
+
+ void enroll(uint64_t msgId);
+ void unenroll(uint64_t msgId); // For failed ops, not normal end-of-trans
+
+ void abort(Messages& messages);
+ void commit(Messages& messages);
+
+protected:
+ friend class TransactionLog;
+ Transaction(uint64_t _id, const TransactionLog::shared_ptr& _log)
+ : id(_id), state(TRANS_OPEN), log(_log) {}
+
+ uint64_t id;
+ State state;
+ TransactionLog::shared_ptr log;
+ std::vector<uint64_t> enrolledMessages;
+ qpid::sys::RWlock enrollLock;
+};
+
+class TransactionContext : public qpid::broker::TransactionContext {
+ Transaction::shared_ptr transaction;
+
+public:
+ TransactionContext(const Transaction::shared_ptr& _transaction)
+ : transaction(_transaction) {}
+
+ virtual Transaction::shared_ptr& getTransaction() { return transaction; }
+};
+
+/**
+ * @class TPCTransaction
+ *
+ * Class representing a Two-Phase-Commit (TPC) AMQP transaction. This is
+ * used around a set of enqueue and dequeue operations that occur when the
+ * broker is acting on a transaction prepare/commit/abort from the client.
+ * This class is what the store uses internally to implement things a
+ * transaction needs; the broker knows about TPCTransactionContext, which
+ * holds a pointer to TPCTransaction.
+ */
+class TPCTransaction : public Transaction {
+
+ friend class TransactionLog;
+ TPCTransaction(uint64_t _id,
+ const TransactionLog::shared_ptr& _log,
+ const std::string& _xid)
+ : Transaction(_id, _log), xid(_xid) {}
+
+ std::string xid;
+
+public:
+ typedef boost::shared_ptr<TPCTransaction> shared_ptr;
+
+ virtual ~TPCTransaction() {}
+
+ void prepare();
+ bool isPrepared() const { return state == TRANS_PREPARED; }
+
+ const std::string& getXid(void) const { return xid; }
+};
+
+class TPCTransactionContext : public qpid::broker::TPCTransactionContext {
+ TPCTransaction::shared_ptr transaction;
+
+public:
+ TPCTransactionContext(const TPCTransaction::shared_ptr& _transaction)
+ : transaction(_transaction) {}
+
+ virtual TPCTransaction::shared_ptr& getTransaction() { return transaction; }
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_TRANSACTION_H */
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
new file mode 100644
index 0000000000..04780e83e8
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp
@@ -0,0 +1,428 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <windows.h>
+#include <clfsw32.h>
+#include <exception>
+#include <malloc.h>
+#include <memory.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/windows/check.h>
+
+#include "TransactionLog.h"
+#include "Transaction.h"
+#include "Lsn.h"
+
+namespace {
+
+// Structures that hold log records. Each has a type field at the start.
+enum TransactionEntryType {
+ TransactionStartDtxEntry = 1,
+ TransactionStartTxEntry = 2,
+ TransactionPrepareEntry = 3,
+ TransactionCommitEntry = 4,
+ TransactionAbortEntry = 5,
+ TransactionDeleteEntry = 6
+};
+// The only thing that really takes up space in transaction records is the
+// xid. Max xid length is in the neighborhood of 600 bytes. Leave some room.
+static const uint32_t MaxTransactionContentLength = 1024;
+
+// Dtx-Start
+struct TransactionStartDtx {
+ TransactionEntryType type;
+ uint32_t length;
+ char content[MaxTransactionContentLength];
+
+ TransactionStartDtx()
+ : type(TransactionStartDtxEntry), length(0) {}
+};
+// Tx-Start
+struct TransactionStartTx {
+ TransactionEntryType type;
+
+ TransactionStartTx()
+ : type(TransactionStartTxEntry) {}
+};
+// Prepare
+struct TransactionPrepare {
+ TransactionEntryType type;
+
+ TransactionPrepare()
+ : type(TransactionPrepareEntry) {}
+};
+// Commit
+struct TransactionCommit {
+ TransactionEntryType type;
+
+ TransactionCommit()
+ : type(TransactionCommitEntry) {}
+};
+// Abort
+struct TransactionAbort {
+ TransactionEntryType type;
+
+ TransactionAbort()
+ : type(TransactionAbortEntry) {}
+};
+// Delete
+struct TransactionDelete {
+ TransactionEntryType type;
+
+ TransactionDelete()
+ : type(TransactionDeleteEntry) {}
+};
+
+} // namespace
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+void
+TransactionLog::initialize()
+{
+ // Write something to occupy the first record, preventing a real
+ // transaction from being lsn/id 0. Delete of a non-existant id is easily
+ // tossed during recovery if no other transactions have caused the tail
+ // to be moved up past this dummy record by then.
+ deleteTransaction(0);
+}
+
+uint32_t
+TransactionLog::marshallingBufferSize()
+{
+ size_t biggestNeed = sizeof(TransactionStartDtx);
+ uint32_t defSize = static_cast<uint32_t>(biggestNeed);
+ uint32_t minSize = Log::marshallingBufferSize();
+ if (defSize <= minSize)
+ return minSize;
+ // Round up to multiple of minSize
+ return (defSize + minSize) / minSize * minSize;
+}
+
+// Get a new Transaction
+boost::shared_ptr<Transaction>
+TransactionLog::begin()
+{
+ TransactionStartTx entry;
+ CLFS_LSN location;
+ uint64_t id;
+ uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+ location = write(&entry, entryLength);
+ try {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ id = lsnToId(location);
+ std::auto_ptr<Transaction> t(new Transaction(id, shared_from_this()));
+ boost::shared_ptr<Transaction> t2(t);
+ boost::weak_ptr<Transaction> weak_t2(t2);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[id] = weak_t2;
+ }
+ return t2;
+ }
+ catch(...) {
+ deleteTransaction(id);
+ throw;
+ }
+}
+
+// Get a new TPCTransaction
+boost::shared_ptr<TPCTransaction>
+TransactionLog::begin(const std::string& xid)
+{
+ TransactionStartDtx entry;
+ CLFS_LSN location;
+ uint64_t id;
+ uint32_t entryLength = static_cast<uint32_t>(sizeof(entry));
+ entry.length = static_cast<uint32_t>(xid.length());
+ memcpy_s(entry.content, sizeof(entry.content),
+ xid.c_str(), xid.length());
+ entryLength -= (sizeof(entry.content) - entry.length);
+ location = write(&entry, entryLength);
+ try {
+ id = lsnToId(location);
+ std::auto_ptr<TPCTransaction> t(new TPCTransaction(id,
+ shared_from_this(),
+ xid));
+ boost::shared_ptr<TPCTransaction> t2(t);
+ boost::weak_ptr<Transaction> weak_t2(t2);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[id] = weak_t2;
+ }
+ return t2;
+ }
+ catch(...) {
+ deleteTransaction(id);
+ throw;
+ }
+}
+
+void
+TransactionLog::recordPrepare(uint64_t transId)
+{
+ TransactionPrepare entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+}
+
+void
+TransactionLog::recordCommit(uint64_t transId)
+{
+ TransactionCommit entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[transId].reset();
+ }
+}
+
+void
+TransactionLog::recordAbort(uint64_t transId)
+{
+ TransactionAbort entry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&entry, sizeof(entry), &transLsn);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds[transId].reset();
+ }
+}
+
+void
+TransactionLog::deleteTransaction(uint64_t transId)
+{
+ uint64_t newFirstId = 0;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ validIds.erase(transId);
+ // May have deleted the first entry; if so the log can release that.
+ // If this deletion results in an empty list of transactions,
+ // move the tail up to this transaction's LSN. This may result in
+ // one or more transactions being stranded in the log until there's
+ // more activity. If a restart happens while these unneeded log
+ // records are there, the presence of the TransactionDelete
+ // entry will cause them to be ignored anyway.
+ if (validIds.empty())
+ newFirstId = transId;
+ else if (validIds.begin()->first > transId)
+ newFirstId = validIds.begin()->first;
+ }
+ TransactionDelete deleteEntry;
+ CLFS_LSN transLsn = idToLsn(transId);
+ write(&deleteEntry, sizeof(deleteEntry), &transLsn);
+ if (newFirstId != 0)
+ moveTail(idToLsn(newFirstId));
+}
+
+void
+TransactionLog::collectPreparedXids(std::map<std::string, TPCTransaction::shared_ptr>& preparedMap)
+{
+ // Go through all the known transactions; if the transaction is still
+ // valid (open or prepared) it will have weak_ptr to the Transaction.
+ // If it can be downcast and has a state of TRANS_PREPARED, add to the map.
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(idsLock);
+ std::map<uint64_t, boost::weak_ptr<Transaction> >::const_iterator i;
+ for (i = validIds.begin(); i != validIds.end(); ++i) {
+ Transaction::shared_ptr t = i->second.lock();
+ if (t.get() == 0)
+ continue;
+ TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t));
+ if (tpct.get() == 0)
+ continue;
+ if (tpct->state == Transaction::TRANS_PREPARED)
+ preparedMap[tpct->getXid()] = tpct;
+ }
+}
+
+void
+TransactionLog::recover(std::map<uint64_t, Transaction::shared_ptr>& transMap)
+{
+ QPID_LOG(debug, "Recovering transaction log");
+
+ // Note that there may be transaction refs in the log which are deleted,
+ // so be sure to only add transactions at Start records, and ignore those
+ // that don't have an existing message record.
+ // Get the base LSN - that's how to say "start reading at the beginning"
+ CLFS_INFORMATION info;
+ ULONG infoLength = sizeof (info);
+ BOOL ok = ::GetLogFileInformation(handle, &info, &infoLength);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Pointers for the various record types that can be assigned in the
+ // reading loop below.
+ TransactionStartDtx *startDtxEntry;
+ TransactionStartTx *startTxEntry;
+
+ PVOID recordPointer;
+ ULONG recordLength;
+ CLFS_RECORD_TYPE recordType = ClfsDataRecord;
+ CLFS_LSN transLsn, current, undoNext;
+ PVOID readContext;
+ uint64_t transId;
+ // Note 'current' in case it's needed below; ReadNextLogRecord returns it
+ // via a parameter.
+ current = info.BaseLsn;
+ ok = ::ReadLogRecord(marshal,
+ &info.BaseLsn,
+ ClfsContextForward,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ &undoNext,
+ &transLsn,
+ &readContext,
+ 0);
+
+ std::auto_ptr<Transaction> tPtr;
+ std::auto_ptr<TPCTransaction> tpcPtr;
+ while (ok) {
+ std::string xid;
+
+ // All the record types this class writes have a TransactionEntryType
+ // in the beginning. Based on that, do what's needed.
+ TransactionEntryType *t =
+ reinterpret_cast<TransactionEntryType *>(recordPointer);
+ switch(*t) {
+ case TransactionStartDtxEntry:
+ startDtxEntry =
+ reinterpret_cast<TransactionStartDtx *>(recordPointer);
+ transId = lsnToId(current);
+ QPID_LOG(debug, "Dtx start, id " << transId);
+ xid.assign(startDtxEntry->content, startDtxEntry->length);
+ tpcPtr.reset(new TPCTransaction(transId, shared_from_this(), xid));
+ transMap[transId] = boost::shared_ptr<TPCTransaction>(tpcPtr);
+ break;
+ case TransactionStartTxEntry:
+ startTxEntry =
+ reinterpret_cast<TransactionStartTx *>(recordPointer);
+ transId = lsnToId(current);
+ QPID_LOG(debug, "Tx start, id " << transId);
+ tPtr.reset(new Transaction(transId, shared_from_this()));
+ transMap[transId] = boost::shared_ptr<Transaction>(tPtr);
+ break;
+ case TransactionPrepareEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Dtx prepare, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Dtx " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_PREPARED;
+ }
+ break;
+ case TransactionCommitEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn commit, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_COMMITTED;
+ }
+ break;
+ case TransactionAbortEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn abort, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_ABORTED;
+ }
+ break;
+ case TransactionDeleteEntry:
+ transId = lsnToId(transLsn);
+ QPID_LOG(debug, "Txn delete, id " << transId);
+ if (transMap.find(transId) == transMap.end()) {
+ QPID_LOG(debug,
+ "Txn " << transId << " doesn't exist; discarded");
+ }
+ else {
+ transMap[transId]->state = Transaction::TRANS_DELETED;
+ transMap.erase(transId);
+ }
+ break;
+ default:
+ throw std::runtime_error("Bad transaction log entry type");
+ }
+
+ recordType = ClfsDataRecord;
+ ok = ::ReadNextLogRecord(readContext,
+ &recordPointer,
+ &recordLength,
+ &recordType,
+ 0, // No userLsn
+ &undoNext,
+ &transLsn,
+ &current,
+ 0);
+ }
+ DWORD status = ::GetLastError();
+ ::TerminateReadLog(readContext);
+ if (status != ERROR_HANDLE_EOF) // No more records
+ throw QPID_WINDOWS_ERROR(status);
+
+ QPID_LOG(debug, "Transaction log recovered");
+
+ // At this point we have a list of all the not-deleted transactions that
+ // were in existence when the broker last ran. All transactions of both
+ // Dtx and Tx types that haven't prepared or committed will be aborted.
+ // This will give the proper background against which to decide each
+ // message's disposition when recovering messages that were involved in
+ // transactions.
+ // In addition to recovering and aborting transactions, rebuild the
+ // validIds map now that we know which ids are really valid.
+ std::map<uint64_t, Transaction::shared_ptr>::const_iterator i;
+ for (i = transMap.begin(); i != transMap.end(); ++i) {
+ switch(i->second->state) {
+ case Transaction::TRANS_OPEN:
+ QPID_LOG(debug, "Txn " << i->first << " was open; aborted");
+ i->second->state = Transaction::TRANS_ABORTED;
+ break;
+ case Transaction::TRANS_ABORTED:
+ QPID_LOG(debug, "Txn " << i->first << " was aborted");
+ break;
+ case Transaction::TRANS_COMMITTED:
+ QPID_LOG(debug, "Txn " << i->first << " was committed");
+ break;
+ case Transaction::TRANS_PREPARED:
+ QPID_LOG(debug, "Txn " << i->first << " was prepared");
+ break;
+ case Transaction::TRANS_DELETED:
+ QPID_LOG(error,
+ "Txn " << i->first << " was deleted; shouldn't be here");
+ break;
+ }
+ boost::weak_ptr<Transaction> weak_txn(i->second);
+ validIds[i->first] = weak_txn;
+ }
+}
+
+}}} // namespace qpid::store::ms_clfs
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
new file mode 100644
index 0000000000..7ca27c229e
--- /dev/null
+++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.h
@@ -0,0 +1,104 @@
+#ifndef QPID_STORE_MSCLFS_TRANSACTIONLOG_H
+#define QPID_STORE_MSCLFS_TRANSACTIONLOG_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <set>
+
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <qpid/broker/RecoveryManager.h>
+#include <qpid/sys/IntegerTypes.h>
+#include <qpid/sys/Mutex.h>
+
+#include "Log.h"
+
+namespace qpid {
+namespace store {
+namespace ms_clfs {
+
+class Transaction;
+class TPCTransaction;
+
+/**
+ * @class TransactionLog
+ *
+ * Represents a CLFS-housed transaction log.
+ */
+class TransactionLog : public Log,
+ public boost::enable_shared_from_this<TransactionLog> {
+
+ // To know when it's ok to move the log tail the lowest valid Id must
+ // always be known. Keep track of valid Ids here. These are transactions
+ // which have not yet been Deleted in the log. They may be new, in progress,
+ // prepared, committed, or aborted - but not deleted.
+ // Entries corresponding to not-yet-finalized transactions (i.e., open or
+ // prepared) also have a weak_ptr so the Transaction can be accessed.
+ // This is primarily to check its state and get a list of prepared Xids.
+ std::map<uint64_t, boost::weak_ptr<Transaction> > validIds;
+ qpid::sys::Mutex idsLock;
+
+protected:
+ // Transaction log needs to have a no-op first record written in the log
+ // to ensure that no real transaction gets an ID 0; messages think trans
+ // id 0 means "no transaction."
+ virtual void initialize();
+
+public:
+ // Inherited and reimplemented from Log. Figure the minimum marshalling
+ // buffer size needed for the records this class writes.
+ virtual uint32_t marshallingBufferSize();
+
+ typedef boost::shared_ptr<TransactionLog> shared_ptr;
+
+ // Get a new Transaction
+ boost::shared_ptr<Transaction> begin();
+
+ // Get a new TPCTransaction
+ boost::shared_ptr<TPCTransaction> begin(const std::string& xid);
+
+ void recordPrepare(uint64_t transId);
+ void recordCommit(uint64_t transId);
+ void recordAbort(uint64_t transId);
+ void deleteTransaction(uint64_t transId);
+
+ // Fill @arg preparedMap with Xid->TPCTransaction::shared_ptr for all
+ // currently prepared transactions.
+ void collectPreparedXids(std::map<std::string, boost::shared_ptr<TPCTransaction> >& preparedMap);
+
+ // Recover the transactions and their state from the log.
+ // Every non-deleted transaction recovered from the log will be
+ // represented in @arg transMap. The recovering messages can use this
+ // information to tell if a transaction referred to in an enqueue/dequeue
+ // operation should be recovered or dropped by examining transaction state.
+ //
+ // @param recoverer Recovery manager used to recreate broker objects from
+ // entries recovered from the log.
+ // @param transMap This method fills in the map of id -> shared_ptr of
+ // recovered transactions.
+ void recover(std::map<uint64_t, boost::shared_ptr<Transaction> >& transMap);
+};
+
+}}} // namespace qpid::store::ms_clfs
+
+#endif /* QPID_STORE_MSCLFS_TRANSACTIONLOG_H */