summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp285
1 files changed, 285 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp b/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp
new file mode 100644
index 0000000000..6ea31f8401
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp
@@ -0,0 +1,285 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <transact.h>
+#include <xolehlp.h>
+#include <txdtc.h>
+#include <oletx2xa.h>
+#include <iostream>
+#include <fstream>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/FrameSet.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "DtxResourceManager.h"
+#include "XaTransaction.h"
+#include "QpidException.h"
+#include "QpidMarshal.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace msclr;
+
+
+/*
+ * There is one DtxResourceManager per broker and per application process.
+ *
+ * Each RM manages a collection of active XaTransaction objects. Participating AmqpSessions enlist
+ * (or re-enlist) with an XaTransaction indexed by the corresponding System.Transaction object. The
+ * RM maintains its own AmqpSession for sending 2PC commnds (dtxPrepare, dtxCommit etc.). The
+ * XaTransaction object works through the lifecycle of the Transaction, including prompting the
+ * enlisted sessions to send their delimiting dtxEnd commands.
+ *
+ * A separate DtcPlugin.cpp file provides the recovery logic when needed in a library named
+ * qpidxarm.dll. The MSDTC maintans recovery info in its log and tracks when there may be
+ * transactions in doubt. See the documentation for IDtcToXaHelperSinglePipe.
+ *
+ * To enable transaction support:
+ * DTC requires a registry key to find the plugin
+ * [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\MSDTC\XADLL] qpidxarm.dll -> [path to qpidxarm.dll]
+ * DTC needs to be configured for XA
+ * cmdprompt -> dcomcnfg -> Component services -> My Computer -> DTC -> Local DTC -> right click properties -> Security -> Enable XA Transactions
+ *
+ */
+
+// TODO: provide shutdown mechanism, perhaps callback from Connection Idle for enlisted connections.
+// But note that a new RM registration with the DTC is very expensive.
+
+
+DtxResourceManager::DtxResourceManager(AmqpConnection^ appConnection) {
+ dtcComp = NULL;
+ xaHelperp = NULL;
+ rmCookie = 0;
+ doubtCount = 0;
+ tmDown = false;
+ AmqpConnection^ clonedCon = appConnection->Clone();
+ dtxControlSession = clonedCon->CreateSession();
+ dataSourceName = clonedCon->DataSourceName;
+ transactionMap = gcnew Collections::Generic::Dictionary<Transaction^, XaTransaction^>();
+
+ HRESULT hr;
+
+ try {
+ // instead of pinning this instance, just use tmp stack variables for small stuff
+ IUnknown* tmp = NULL;
+ // request the default DTC
+ hr = DtcGetTransactionManager(NULL, NULL, IID_IUnknown, 0, 0, 0, (void **)&tmp);
+ if (hr != S_OK)
+ throw gcnew QpidException("connection failure to DTC service");
+ dtcComp = tmp;
+
+ IDtcToXaHelperSinglePipe *tmp2 = NULL;
+ hr = ((IUnknown *)dtcComp)->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void**) &tmp2);
+ if (hr != S_OK)
+ throw gcnew QpidException("DTC XA unavailable");
+ xaHelperp = tmp2;
+
+ std::string native_dsn = QpidMarshal::ToNative(dataSourceName);
+ DWORD tmp3;
+
+ // This call doesn't return until the DTC has opened and closed a connection to the broker
+ // and written a recovery entry in its log.
+ hr = ((IDtcToXaHelperSinglePipe *) xaHelperp)->XARMCreate(const_cast<char *>(native_dsn.c_str()), "qpidxarm.dll", &tmp3);
+ if (hr != S_OK) {
+ switch (hr) {
+ case E_FAIL:
+ throw gcnew QpidException("Resource Manager DLL configuration error");
+ case E_INVALIDARG:
+ throw gcnew QpidException("Resource Manager internal error");
+ case E_OUTOFMEMORY:
+ throw gcnew QpidException("Resource Manager out of memory");
+ case E_UNEXPECTED:
+ throw gcnew QpidException("Resource Manager internal failure");
+ case XACT_E_TMNOTAVAILABLE:
+ case XACT_E_CONNECTION_DOWN:
+ throw gcnew QpidException("MSDTC unavailable");
+
+ default:
+ throw gcnew QpidException("Resource Manager Registration failed");
+ }
+ }
+
+ rmCookie = tmp3;
+ }
+ finally {
+ if (rmCookie == 0) {
+ // undo partial construction
+ Cleanup();
+ }
+ }
+}
+
+
+DtxResourceManager::!DtxResourceManager() {
+ Cleanup();
+}
+
+
+DtxResourceManager::~DtxResourceManager() {
+ GC::SuppressFinalize(this);
+ Cleanup();
+}
+
+
+// Called when the DTC COM proxy sends TMDOWN to a pending XaTransaction
+// called once for each outstanding tx
+
+void DtxResourceManager::TmDown() {
+ // this block is the only place where both locks are held
+ lock l1(transactionMap);
+ lock l2(resourceManagerMap);
+ if (tmDown)
+ return;
+
+ tmDown = true;
+ resourceManagerMap->Remove(this->dataSourceName);
+ // defer cleanup until last TmDown notification received
+}
+
+
+
+void DtxResourceManager::Cleanup() {
+ for each (Collections::Generic::KeyValuePair<Transaction^, XaTransaction^> kvp in transactionMap) {
+ XaTransaction^ xaTr = kvp.Value;
+ xaTr->ChildFinalize();
+ }
+
+ try {
+ if (rmCookie != 0) {
+ // implies no recovery needed
+ bool cleanSession = (doubtCount == 0) && (transactionMap->Count == 0);
+ ((IDtcToXaHelperSinglePipe *)xaHelperp)->ReleaseRMCookie(rmCookie, cleanSession);
+ rmCookie = 0;
+ }
+
+
+ if (xaHelperp != NULL) {
+ ((IDtcToXaHelperSinglePipe *) xaHelperp)->Release();
+ xaHelperp = NULL;
+ }
+
+ if (dtcComp != NULL) {
+ ((IUnknown *) dtcComp)->Release();
+ dtcComp = NULL;
+ }
+
+ if (dtxControlSession != nullptr) {
+ dtxControlSession->Connection->Close();
+ }
+
+ }
+ catch (Exception^) {}
+}
+
+
+XaTransaction^ DtxResourceManager::GetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) {
+ // find or create the RM instance associated with the session's broker
+ AmqpConnection^ connection = appSession->Connection;
+ DtxResourceManager^ instance = connection->CachedResourceManager;
+
+ // try cached rm first
+ if (instance != nullptr) {
+ XaTransaction^ xaTx = instance->InternalGetXaTransaction(appSession, transaction);
+ if (xaTx != nullptr)
+ return xaTx;
+ else {
+ // cached version no longer available, force new rm creation
+ connection->CachedResourceManager = nullptr;
+ }
+ }
+
+ lock l(resourceManagerMap);
+ String^ dsn = connection->DataSourceName;
+ if (!resourceManagerMap->TryGetValue(dsn, instance)) {
+ instance = gcnew DtxResourceManager(connection->Clone());
+ resourceManagerMap->Add(dsn, instance);
+ connection->CachedResourceManager = instance;
+ }
+ l.release();
+
+ return instance->InternalGetXaTransaction(appSession, transaction);
+}
+
+
+XaTransaction^ DtxResourceManager::InternalGetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) {
+ // find or create the tx proxy instance associated with the DTC transaction
+ lock l(transactionMap);
+ if (tmDown)
+ return nullptr;
+
+ XaTransaction^ xaTransaction = nullptr;
+ if (!transactionMap->TryGetValue(transaction, xaTransaction)) {
+ xaTransaction = gcnew XaTransaction(transaction, (IDtcToXaHelperSinglePipe *) xaHelperp, rmCookie, this);
+ transactionMap->Add(transaction, xaTransaction);
+ }
+
+ return xaTransaction;
+}
+
+void DtxResourceManager::Complete(Transaction ^tx) {
+ lock l(transactionMap);
+ transactionMap->Remove(tx);
+
+ if (tmDown && (transactionMap->Count == 0)) {
+ // no more activity on this instance
+ GC::SuppressFinalize(this);
+ Cleanup();
+ }
+}
+
+
+void DtxResourceManager::IncrementDoubt() {
+ Interlocked::Increment(doubtCount);
+}
+
+
+void DtxResourceManager::DecrementDoubt() {
+ Interlocked::Decrement(doubtCount);
+}
+
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+void DtxResourceManager::ForceRecovery(Transaction ^tx) {
+ lock l(resourceManagerMap);
+ for each (Collections::Generic::KeyValuePair<System::String^, DtxResourceManager^> kvp in resourceManagerMap) {
+
+ Collections::Generic::Dictionary<Transaction^, XaTransaction^>^ txmap = kvp.Value->transactionMap;
+ XaTransaction^ xaTransaction = nullptr;
+ lock l2(txmap);
+ if (txmap->TryGetValue(tx, xaTransaction)) {
+ xaTransaction->ForceRecovery();
+ }
+ }
+}
+#endif
+
+}}} // namespace Apache::Qpid::Interop