summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp525
1 files changed, 525 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp b/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
new file mode 100644
index 0000000000..23743316ff
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
@@ -0,0 +1,525 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <transact.h>
+#include <xolehlp.h>
+#include <txdtc.h>
+#include <oletx2xa.h>
+#include <iostream>
+#include <fstream>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/Xid.h"
+
+#include "QpidException.h"
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "DtxResourceManager.h"
+#include "XaTransaction.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace msclr;
+
+using namespace qpid::framing::dtx;
+
+// ------------------------------------------------------------------------
+// Start of a pure native code section
+#pragma unmanaged
+// ------------------------------------------------------------------------
+
+// This is the native COM object the DTC expects to talk to for coordination.
+// There is exactly one native instance of this for each managed XaTransaction object.
+
+
+class DtcCallbackHandler : public ITransactionResourceAsync
+{
+private:
+ long useCount;
+ DtcCallbackFp managedCallback;
+public:
+ ITransactionEnlistmentAsync *txHandle;
+ DtcCallbackHandler(DtcCallbackFp cbp) : managedCallback(cbp), useCount(0) {}
+ ~DtcCallbackHandler() {}
+ virtual HRESULT __stdcall PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase);
+ virtual HRESULT __stdcall CommitRequest(DWORD grfrm, XACTUOW *unused);
+ virtual HRESULT __stdcall AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3);
+
+ virtual HRESULT __stdcall TMDown();
+ virtual HRESULT __stdcall DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject);
+ virtual ULONG __stdcall DtcCallbackHandler::AddRef();
+ virtual ULONG __stdcall DtcCallbackHandler::Release();
+ void __stdcall AbortRequestDone();
+};
+
+
+HRESULT DtcCallbackHandler::PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase)
+{
+ if (singlePhase) {
+ return managedCallback(DTC_SINGLE_PHASE) ? S_OK : E_FAIL;
+ }
+
+ return managedCallback(DTC_PREPARE) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::CommitRequest(DWORD grfrm, XACTUOW *unused)
+{
+ return managedCallback(DTC_COMMIT) ? S_OK : E_FAIL;
+}
+
+HRESULT DtcCallbackHandler::AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3)
+{
+ return managedCallback(DTC_ABORT) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::TMDown()
+{
+ return managedCallback(DTC_TMDOWN) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject)
+{
+ *ppvObject = NULL;
+
+ if ((riid == IID_IUnknown) || (riid == IID_IResourceManagerSink))
+ *ppvObject = this;
+ else
+ return ResultFromScode(E_NOINTERFACE);
+
+ this->AddRef();
+ return S_OK;
+}
+
+
+ULONG DtcCallbackHandler::AddRef()
+{
+ return InterlockedIncrement(&useCount);
+}
+
+
+ULONG DtcCallbackHandler::Release()
+{
+ long uc = InterlockedDecrement(&useCount);
+
+ if (uc)
+ return uc;
+
+ delete this;
+ return 0;
+}
+
+
+// ------------------------------------------------------------------------
+// End of pure native code section
+#pragma managed
+// ------------------------------------------------------------------------
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+void XaTransaction::ForceRecovery() {
+ debugFailMode = true;
+}
+#endif
+
+// ------------------------------------------------------------------------
+// ------------------------------------------------------------------------
+
+
+XaTransaction::XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *xaHelperp, DWORD rmCookie, DtxResourceManager^ rm) {
+ bool success = false;
+ xidp = NULL;
+ commandCompletionp = NULL;
+ firstDtxStartCompletionp = NULL;
+ nativeHandler = NULL;
+ resourceManager = rm;
+ controlSession = rm->DtxControlSession;
+ active = true;
+ preparing = false;
+ systemTransaction = t;
+ IntPtr comTxp = IntPtr::Zero;
+ completionHandle = gcnew ManualResetEvent(false);
+
+ try {
+ enlistedSessions = gcnew Collections::Generic::List<AmqpSession^>();
+
+ // take a System.Transactions.Transaction and obtain
+ // the corresponding DTC COM object.
+ IDtcTransaction^ dtcTransaction = TransactionInterop::GetDtcTransaction(t);
+ comTxp = Marshal::GetIUnknownForObject(dtcTransaction);
+ XID winXid;
+ HRESULT hr = xaHelperp->ConvertTridToXID((DWORD *)comTxp.ToPointer(), rmCookie, &winXid);
+ if (hr != S_OK)
+ throw gcnew QpidException("get XA XID");
+
+ // Convert the X/Open format to the internal Qpid format
+ xidp = new qpid::framing::Xid();
+ xidp->setFormat((uint32_t) winXid.formatID);
+ int bqualPos = 0;
+ if (winXid.gtrid_length > 0) {
+ xidp->setGlobalId(std::string(winXid.data, winXid.gtrid_length));
+ bqualPos = winXid.gtrid_length;
+ }
+ if (winXid.bqual_length > 0) {
+ xidp->setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length));
+ }
+
+ // create the callback chain: DTC proxy -> DtcCallbackHandler -> this
+ inboundDelegate = gcnew DtcCallbackDelegate(this, &XaTransaction::DtcCallback);
+ IntPtr ip = Marshal::GetFunctionPointerForDelegate(inboundDelegate);
+ nativeHandler = new DtcCallbackHandler(static_cast<DtcCallbackFp>(ip.ToPointer()));
+ // add myself for later smart pointer destruction
+ nativeHandler->AddRef();
+
+ hr = xaHelperp->EnlistWithRM(rmCookie, (ITransaction *)comTxp.ToPointer(), nativeHandler, &(nativeHandler->txHandle));
+
+ if (hr != S_OK)
+ throw gcnew QpidException("Enlist");
+
+ success = true;
+ }
+ finally {
+ if (!success)
+ Cleanup();
+ if (comTxp != IntPtr::Zero)
+ ((IUnknown *) comTxp.ToPointer())->Release();
+ }
+}
+
+
+void XaTransaction::Cleanup() {
+ if (firstDtxStartCompletionp != NULL) {
+ try {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ firstDtxStartCompletionp = NULL;
+ }
+
+ if (nativeHandler != NULL) {
+ nativeHandler->Release();
+ nativeHandler = NULL;
+ }
+ if (xidp != NULL) {
+ delete xidp;
+ xidp = NULL;
+ }
+}
+
+
+XaTransaction^ XaTransaction::Enlist (AmqpSession ^session) {
+ lock l(enlistedSessions);
+ if (!active)
+ throw gcnew QpidException("transaction enlistment internal error");
+ if (!enlistedSessions->Contains(session)) {
+ enlistedSessions->Add(session);
+ if (firstEnlistedSession == nullptr) {
+ firstEnlistedSession = session;
+ IntPtr intptr = session->DtxStart((IntPtr) xidp, false, false);
+ firstDtxStartCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ }
+ else {
+ // the broker must see the dtxStart as a join operation, and it must arrive
+ // at the broker after the first dtx start
+ if (firstDtxStartCompletionp != NULL)
+ firstDtxStartCompletionp->wait();
+ session->DtxStart((IntPtr) xidp, true, false);
+ }
+ }
+ else {
+ // already started once, so resume is true
+ session->DtxStart((IntPtr) xidp, false, true);
+ }
+ return this;
+}
+
+
+void XaTransaction::SessionClosing(AmqpSession^ session) {
+ lock l(enlistedSessions);
+ if (!enlistedSessions->Contains(session))
+ return;
+
+ enlistedSessions->Remove(session);
+ if (!active) {
+ // Phase0Flush already done on all sessions
+ l.release();
+ return;
+ }
+
+ IntPtr completion = session->BeginPhase0Flush(this);
+ session->EndPhase0Flush(this, completion);
+
+ if (session == firstEnlistedSession) {
+ // if we just completed the dtxEnd, we know the dtxStart completed before that
+ if (firstDtxStartCompletionp != NULL) {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ firstDtxStartCompletionp = NULL;
+ }
+ }
+}
+
+
+void XaTransaction::Phase0Flush() {
+ // let each session delimit their transactional work with an AMQP dtx.end protocol frame
+ lock l(enlistedSessions);
+ if (!active)
+ return;
+
+ active = false; // no more enlistments
+ int scount = enlistedSessions->Count;
+
+ if (scount > 0) {
+ array<IntPtr> ^completions = gcnew array<IntPtr>(scount);
+ for (int i = 0; i < scount; i++) {
+
+ // TODO: skip phase0 flush for rollback case
+
+ completions[i] = enlistedSessions[i]->BeginPhase0Flush(this);
+ }
+
+ for (int i = 0; i < scount; i++) {
+ // without each session.sync(), session commands are queued up in the right order,
+ // but on their separate outbound channels, and destined for receipt at separate Broker inbound
+ // channels. It is not clear how to be sure Phase 0 dtx.End is processed in the
+ // correct order before commit on the broker without the sync.
+ enlistedSessions[i]->EndPhase0Flush(this, completions[i]);
+ }
+ }
+
+ // since all dtxEnds have completed, we know all starts have too
+ if (firstDtxStartCompletionp != NULL) {
+ try {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ firstDtxStartCompletionp = NULL;
+ }
+}
+
+
+bool XaTransaction::DtcCallback (DtcCallbackType callback) {
+ // called by the DTC proxy thread. Be brief and don't block (but Phase0Flush?)
+
+ if (AppDomain::CurrentDomain->IsFinalizingForUnload())
+ return false;
+
+ IntPtr intptr = IntPtr::Zero;
+ currentCommand = callback;
+
+ try {
+ switch (callback) {
+ case DTC_PREPARE:
+ Phase0Flush();
+ try {
+ intptr = controlSession->DtxPrepare((IntPtr) xidp);
+ preparing = true;
+ resourceManager->IncrementDoubt();
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_COMMIT:
+#ifdef QPID_RECOVERY_TEST_HOOK
+ if (debugFailMode){ return; }
+#endif
+ // no phase 0 required. always preceded by a prepare
+ try {
+ intptr = controlSession->DtxCommit((IntPtr) xidp, false);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_ABORT:
+ Phase0Flush();
+#ifdef QPID_RECOVERY_TEST_HOOK
+ if (debugFailMode){ return; }
+#endif
+ try {
+ intptr = controlSession->DtxRollback((IntPtr) xidp);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_SINGLE_PHASE:
+ Phase0Flush();
+ try {
+ intptr = controlSession->DtxCommit((IntPtr) xidp, true);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_TMDOWN:
+ commandCompletionp = NULL;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+ }
+ return true;
+ }
+ catch (System::Exception^ e) {
+ // TODO: log it
+ Console::WriteLine("Unexpected DtcCallback exception: {0}", e->ToString());
+ }
+ catch (...) {
+ // TODO: log it
+ }
+ return false;
+}
+
+
+// this handles the case where the application regains control for
+// a new transaction before we are notified (abort/rollback
+// optimization in DTC).
+
+void XaTransaction::NotifyPhase0() {
+ if (active)
+ Phase0Flush();
+}
+
+
+void XaTransaction::AsyncCompleter(Object ^unused) {
+ bool success = false;
+
+ if (commandCompletionp != NULL) {
+ try {
+ // waits for the AMQP broker's response and returns the decoded content
+ XaResult& xaResult = commandCompletionp->get();
+ if (xaResult.hasStatus()) {
+ if (xaResult.getStatus() == XaStatus::XA_STATUS_XA_OK) {
+ success = true;
+ }
+ }
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+ try {
+ controlSession->ReleaseCompletion((IntPtr) commandCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ commandCompletionp = NULL;
+ }
+
+ ITransactionEnlistmentAsync *dtcTxHandle = nativeHandler->txHandle;
+
+ HRESULT hr = success ? S_OK : E_FAIL;
+
+ switch (currentCommand) {
+ case DTC_PREPARE:
+ dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
+ break;
+
+ case DTC_COMMIT:
+ dtcTxHandle->CommitRequestDone(hr);
+ if (success)
+ resourceManager->DecrementDoubt();
+ Complete();
+ break;
+
+ case DTC_ABORT:
+ dtcTxHandle->AbortRequestDone(hr);
+ if (success) {
+ if (preparing) {
+ preparing = false;
+ resourceManager->DecrementDoubt();
+ }
+ }
+ Complete();
+ break;
+
+ case DTC_SINGLE_PHASE:
+ if (success)
+ hr = XACT_S_SINGLEPHASE;
+ dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
+ Complete();
+ break;
+
+ case DTC_TMDOWN:
+ // Stop the RM from accepting new enlistments
+ resourceManager->TmDown();
+ Complete();
+ break;
+ }
+}
+
+
+void XaTransaction::Complete() {
+ Cleanup();
+ resourceManager->Complete(systemTransaction);
+ completionHandle->Set();
+}
+
+
+void XaTransaction::WaitForCompletion() {
+ completionHandle->WaitOne();
+}
+
+
+ /*
+void XaTransaction::WaitForFlush() {
+ isFlushedHandle->WaitOne();
+}
+ */
+
+// called from DtxResourceManager Finalize
+
+void XaTransaction::ChildFinalize() {
+ lock l(enlistedSessions);
+ Phase0Flush();
+ Cleanup();
+}
+
+
+
+}}} // namespace Apache::Qpid::Interop