summaryrefslogtreecommitdiff
path: root/trunk/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp')
-rw-r--r--trunk/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp525
1 files changed, 0 insertions, 525 deletions
diff --git a/trunk/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp b/trunk/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
deleted file mode 100644
index 23743316ff..0000000000
--- a/trunk/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
+++ /dev/null
@@ -1,525 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-#include <windows.h>
-#include <msclr\lock.h>
-#include <transact.h>
-#include <xolehlp.h>
-#include <txdtc.h>
-#include <oletx2xa.h>
-#include <iostream>
-#include <fstream>
-
-#include "qpid/client/AsyncSession.h"
-#include "qpid/client/SubscriptionManager.h"
-#include "qpid/client/Connection.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/Xid.h"
-
-#include "QpidException.h"
-#include "AmqpConnection.h"
-#include "AmqpSession.h"
-#include "DtxResourceManager.h"
-#include "XaTransaction.h"
-
-namespace Apache {
-namespace Qpid {
-namespace Interop {
-
-using namespace System;
-using namespace System::Runtime::InteropServices;
-using namespace System::Transactions;
-using namespace msclr;
-
-using namespace qpid::framing::dtx;
-
-// ------------------------------------------------------------------------
-// Start of a pure native code section
-#pragma unmanaged
-// ------------------------------------------------------------------------
-
-// This is the native COM object the DTC expects to talk to for coordination.
-// There is exactly one native instance of this for each managed XaTransaction object.
-
-
-class DtcCallbackHandler : public ITransactionResourceAsync
-{
-private:
- long useCount;
- DtcCallbackFp managedCallback;
-public:
- ITransactionEnlistmentAsync *txHandle;
- DtcCallbackHandler(DtcCallbackFp cbp) : managedCallback(cbp), useCount(0) {}
- ~DtcCallbackHandler() {}
- virtual HRESULT __stdcall PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase);
- virtual HRESULT __stdcall CommitRequest(DWORD grfrm, XACTUOW *unused);
- virtual HRESULT __stdcall AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3);
-
- virtual HRESULT __stdcall TMDown();
- virtual HRESULT __stdcall DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject);
- virtual ULONG __stdcall DtcCallbackHandler::AddRef();
- virtual ULONG __stdcall DtcCallbackHandler::Release();
- void __stdcall AbortRequestDone();
-};
-
-
-HRESULT DtcCallbackHandler::PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase)
-{
- if (singlePhase) {
- return managedCallback(DTC_SINGLE_PHASE) ? S_OK : E_FAIL;
- }
-
- return managedCallback(DTC_PREPARE) ? S_OK : E_FAIL;
-}
-
-
-HRESULT DtcCallbackHandler::CommitRequest(DWORD grfrm, XACTUOW *unused)
-{
- return managedCallback(DTC_COMMIT) ? S_OK : E_FAIL;
-}
-
-HRESULT DtcCallbackHandler::AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3)
-{
- return managedCallback(DTC_ABORT) ? S_OK : E_FAIL;
-}
-
-
-HRESULT DtcCallbackHandler::TMDown()
-{
- return managedCallback(DTC_TMDOWN) ? S_OK : E_FAIL;
-}
-
-
-HRESULT DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject)
-{
- *ppvObject = NULL;
-
- if ((riid == IID_IUnknown) || (riid == IID_IResourceManagerSink))
- *ppvObject = this;
- else
- return ResultFromScode(E_NOINTERFACE);
-
- this->AddRef();
- return S_OK;
-}
-
-
-ULONG DtcCallbackHandler::AddRef()
-{
- return InterlockedIncrement(&useCount);
-}
-
-
-ULONG DtcCallbackHandler::Release()
-{
- long uc = InterlockedDecrement(&useCount);
-
- if (uc)
- return uc;
-
- delete this;
- return 0;
-}
-
-
-// ------------------------------------------------------------------------
-// End of pure native code section
-#pragma managed
-// ------------------------------------------------------------------------
-
-#ifdef QPID_RECOVERY_TEST_HOOK
-void XaTransaction::ForceRecovery() {
- debugFailMode = true;
-}
-#endif
-
-// ------------------------------------------------------------------------
-// ------------------------------------------------------------------------
-
-
-XaTransaction::XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *xaHelperp, DWORD rmCookie, DtxResourceManager^ rm) {
- bool success = false;
- xidp = NULL;
- commandCompletionp = NULL;
- firstDtxStartCompletionp = NULL;
- nativeHandler = NULL;
- resourceManager = rm;
- controlSession = rm->DtxControlSession;
- active = true;
- preparing = false;
- systemTransaction = t;
- IntPtr comTxp = IntPtr::Zero;
- completionHandle = gcnew ManualResetEvent(false);
-
- try {
- enlistedSessions = gcnew Collections::Generic::List<AmqpSession^>();
-
- // take a System.Transactions.Transaction and obtain
- // the corresponding DTC COM object.
- IDtcTransaction^ dtcTransaction = TransactionInterop::GetDtcTransaction(t);
- comTxp = Marshal::GetIUnknownForObject(dtcTransaction);
- XID winXid;
- HRESULT hr = xaHelperp->ConvertTridToXID((DWORD *)comTxp.ToPointer(), rmCookie, &winXid);
- if (hr != S_OK)
- throw gcnew QpidException("get XA XID");
-
- // Convert the X/Open format to the internal Qpid format
- xidp = new qpid::framing::Xid();
- xidp->setFormat((uint32_t) winXid.formatID);
- int bqualPos = 0;
- if (winXid.gtrid_length > 0) {
- xidp->setGlobalId(std::string(winXid.data, winXid.gtrid_length));
- bqualPos = winXid.gtrid_length;
- }
- if (winXid.bqual_length > 0) {
- xidp->setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length));
- }
-
- // create the callback chain: DTC proxy -> DtcCallbackHandler -> this
- inboundDelegate = gcnew DtcCallbackDelegate(this, &XaTransaction::DtcCallback);
- IntPtr ip = Marshal::GetFunctionPointerForDelegate(inboundDelegate);
- nativeHandler = new DtcCallbackHandler(static_cast<DtcCallbackFp>(ip.ToPointer()));
- // add myself for later smart pointer destruction
- nativeHandler->AddRef();
-
- hr = xaHelperp->EnlistWithRM(rmCookie, (ITransaction *)comTxp.ToPointer(), nativeHandler, &(nativeHandler->txHandle));
-
- if (hr != S_OK)
- throw gcnew QpidException("Enlist");
-
- success = true;
- }
- finally {
- if (!success)
- Cleanup();
- if (comTxp != IntPtr::Zero)
- ((IUnknown *) comTxp.ToPointer())->Release();
- }
-}
-
-
-void XaTransaction::Cleanup() {
- if (firstDtxStartCompletionp != NULL) {
- try {
- firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
- }
- catch (...) {
- // TODO: log it?
- }
-
- firstDtxStartCompletionp = NULL;
- }
-
- if (nativeHandler != NULL) {
- nativeHandler->Release();
- nativeHandler = NULL;
- }
- if (xidp != NULL) {
- delete xidp;
- xidp = NULL;
- }
-}
-
-
-XaTransaction^ XaTransaction::Enlist (AmqpSession ^session) {
- lock l(enlistedSessions);
- if (!active)
- throw gcnew QpidException("transaction enlistment internal error");
- if (!enlistedSessions->Contains(session)) {
- enlistedSessions->Add(session);
- if (firstEnlistedSession == nullptr) {
- firstEnlistedSession = session;
- IntPtr intptr = session->DtxStart((IntPtr) xidp, false, false);
- firstDtxStartCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
- }
- else {
- // the broker must see the dtxStart as a join operation, and it must arrive
- // at the broker after the first dtx start
- if (firstDtxStartCompletionp != NULL)
- firstDtxStartCompletionp->wait();
- session->DtxStart((IntPtr) xidp, true, false);
- }
- }
- else {
- // already started once, so resume is true
- session->DtxStart((IntPtr) xidp, false, true);
- }
- return this;
-}
-
-
-void XaTransaction::SessionClosing(AmqpSession^ session) {
- lock l(enlistedSessions);
- if (!enlistedSessions->Contains(session))
- return;
-
- enlistedSessions->Remove(session);
- if (!active) {
- // Phase0Flush already done on all sessions
- l.release();
- return;
- }
-
- IntPtr completion = session->BeginPhase0Flush(this);
- session->EndPhase0Flush(this, completion);
-
- if (session == firstEnlistedSession) {
- // if we just completed the dtxEnd, we know the dtxStart completed before that
- if (firstDtxStartCompletionp != NULL) {
- firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
- firstDtxStartCompletionp = NULL;
- }
- }
-}
-
-
-void XaTransaction::Phase0Flush() {
- // let each session delimit their transactional work with an AMQP dtx.end protocol frame
- lock l(enlistedSessions);
- if (!active)
- return;
-
- active = false; // no more enlistments
- int scount = enlistedSessions->Count;
-
- if (scount > 0) {
- array<IntPtr> ^completions = gcnew array<IntPtr>(scount);
- for (int i = 0; i < scount; i++) {
-
- // TODO: skip phase0 flush for rollback case
-
- completions[i] = enlistedSessions[i]->BeginPhase0Flush(this);
- }
-
- for (int i = 0; i < scount; i++) {
- // without each session.sync(), session commands are queued up in the right order,
- // but on their separate outbound channels, and destined for receipt at separate Broker inbound
- // channels. It is not clear how to be sure Phase 0 dtx.End is processed in the
- // correct order before commit on the broker without the sync.
- enlistedSessions[i]->EndPhase0Flush(this, completions[i]);
- }
- }
-
- // since all dtxEnds have completed, we know all starts have too
- if (firstDtxStartCompletionp != NULL) {
- try {
- firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
- }
- catch (...) {
- // TODO: log it?
- }
-
- firstDtxStartCompletionp = NULL;
- }
-}
-
-
-bool XaTransaction::DtcCallback (DtcCallbackType callback) {
- // called by the DTC proxy thread. Be brief and don't block (but Phase0Flush?)
-
- if (AppDomain::CurrentDomain->IsFinalizingForUnload())
- return false;
-
- IntPtr intptr = IntPtr::Zero;
- currentCommand = callback;
-
- try {
- switch (callback) {
- case DTC_PREPARE:
- Phase0Flush();
- try {
- intptr = controlSession->DtxPrepare((IntPtr) xidp);
- preparing = true;
- resourceManager->IncrementDoubt();
- }
- catch (System::Exception^ ) {
- // intptr remains nullptr
- }
- commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
- ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
- break;
-
- case DTC_COMMIT:
-#ifdef QPID_RECOVERY_TEST_HOOK
- if (debugFailMode){ return; }
-#endif
- // no phase 0 required. always preceded by a prepare
- try {
- intptr = controlSession->DtxCommit((IntPtr) xidp, false);
- }
- catch (System::Exception^ ) {
- // intptr remains nullptr
- }
- commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
- ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
- break;
-
- case DTC_ABORT:
- Phase0Flush();
-#ifdef QPID_RECOVERY_TEST_HOOK
- if (debugFailMode){ return; }
-#endif
- try {
- intptr = controlSession->DtxRollback((IntPtr) xidp);
- }
- catch (System::Exception^ ) {
- // intptr remains nullptr
- }
- commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
- ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
- break;
-
- case DTC_SINGLE_PHASE:
- Phase0Flush();
- try {
- intptr = controlSession->DtxCommit((IntPtr) xidp, true);
- }
- catch (System::Exception^ ) {
- // intptr remains nullptr
- }
- commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
- ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
- break;
-
- case DTC_TMDOWN:
- commandCompletionp = NULL;
- ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
- break;
- }
- return true;
- }
- catch (System::Exception^ e) {
- // TODO: log it
- Console::WriteLine("Unexpected DtcCallback exception: {0}", e->ToString());
- }
- catch (...) {
- // TODO: log it
- }
- return false;
-}
-
-
-// this handles the case where the application regains control for
-// a new transaction before we are notified (abort/rollback
-// optimization in DTC).
-
-void XaTransaction::NotifyPhase0() {
- if (active)
- Phase0Flush();
-}
-
-
-void XaTransaction::AsyncCompleter(Object ^unused) {
- bool success = false;
-
- if (commandCompletionp != NULL) {
- try {
- // waits for the AMQP broker's response and returns the decoded content
- XaResult& xaResult = commandCompletionp->get();
- if (xaResult.hasStatus()) {
- if (xaResult.getStatus() == XaStatus::XA_STATUS_XA_OK) {
- success = true;
- }
- }
- }
- catch (...) {
- // TODO: log it?
- }
- try {
- controlSession->ReleaseCompletion((IntPtr) commandCompletionp);
- }
- catch (...) {
- // TODO: log it?
- }
-
- commandCompletionp = NULL;
- }
-
- ITransactionEnlistmentAsync *dtcTxHandle = nativeHandler->txHandle;
-
- HRESULT hr = success ? S_OK : E_FAIL;
-
- switch (currentCommand) {
- case DTC_PREPARE:
- dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
- break;
-
- case DTC_COMMIT:
- dtcTxHandle->CommitRequestDone(hr);
- if (success)
- resourceManager->DecrementDoubt();
- Complete();
- break;
-
- case DTC_ABORT:
- dtcTxHandle->AbortRequestDone(hr);
- if (success) {
- if (preparing) {
- preparing = false;
- resourceManager->DecrementDoubt();
- }
- }
- Complete();
- break;
-
- case DTC_SINGLE_PHASE:
- if (success)
- hr = XACT_S_SINGLEPHASE;
- dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
- Complete();
- break;
-
- case DTC_TMDOWN:
- // Stop the RM from accepting new enlistments
- resourceManager->TmDown();
- Complete();
- break;
- }
-}
-
-
-void XaTransaction::Complete() {
- Cleanup();
- resourceManager->Complete(systemTransaction);
- completionHandle->Set();
-}
-
-
-void XaTransaction::WaitForCompletion() {
- completionHandle->WaitOne();
-}
-
-
- /*
-void XaTransaction::WaitForFlush() {
- isFlushedHandle->WaitOne();
-}
- */
-
-// called from DtxResourceManager Finalize
-
-void XaTransaction::ChildFinalize() {
- lock l(enlistedSessions);
- Phase0Flush();
- Cleanup();
-}
-
-
-
-}}} // namespace Apache::Qpid::Interop