/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #include #include #include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/Xid.h" #include "QpidException.h" #include "AmqpConnection.h" #include "AmqpSession.h" #include "DtxResourceManager.h" #include "XaTransaction.h" namespace Apache { namespace Qpid { namespace Interop { using namespace System; using namespace System::Runtime::InteropServices; using namespace System::Transactions; using namespace msclr; using namespace qpid::framing::dtx; // ------------------------------------------------------------------------ // Start of a pure native code section #pragma unmanaged // ------------------------------------------------------------------------ // This is the native COM object the DTC expects to talk to for coordination. // There is exactly one native instance of this for each managed XaTransaction object. class DtcCallbackHandler : public ITransactionResourceAsync { private: long useCount; DtcCallbackFp managedCallback; public: ITransactionEnlistmentAsync *txHandle; DtcCallbackHandler(DtcCallbackFp cbp) : managedCallback(cbp), useCount(0) {} ~DtcCallbackHandler() {} virtual HRESULT __stdcall PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase); virtual HRESULT __stdcall CommitRequest(DWORD grfrm, XACTUOW *unused); virtual HRESULT __stdcall AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3); virtual HRESULT __stdcall TMDown(); virtual HRESULT __stdcall DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject); virtual ULONG __stdcall DtcCallbackHandler::AddRef(); virtual ULONG __stdcall DtcCallbackHandler::Release(); void __stdcall AbortRequestDone(); }; HRESULT DtcCallbackHandler::PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase) { if (singlePhase) { return managedCallback(DTC_SINGLE_PHASE) ? S_OK : E_FAIL; } return managedCallback(DTC_PREPARE) ? S_OK : E_FAIL; } HRESULT DtcCallbackHandler::CommitRequest(DWORD grfrm, XACTUOW *unused) { return managedCallback(DTC_COMMIT) ? S_OK : E_FAIL; } HRESULT DtcCallbackHandler::AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3) { return managedCallback(DTC_ABORT) ? S_OK : E_FAIL; } HRESULT DtcCallbackHandler::TMDown() { return managedCallback(DTC_TMDOWN) ? S_OK : E_FAIL; } HRESULT DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject) { *ppvObject = NULL; if ((riid == IID_IUnknown) || (riid == IID_IResourceManagerSink)) *ppvObject = this; else return ResultFromScode(E_NOINTERFACE); this->AddRef(); return S_OK; } ULONG DtcCallbackHandler::AddRef() { return InterlockedIncrement(&useCount); } ULONG DtcCallbackHandler::Release() { long uc = InterlockedDecrement(&useCount); if (uc) return uc; delete this; return 0; } // ------------------------------------------------------------------------ // End of pure native code section #pragma managed // ------------------------------------------------------------------------ #ifdef QPID_RECOVERY_TEST_HOOK void XaTransaction::ForceRecovery() { debugFailMode = true; } #endif // ------------------------------------------------------------------------ // ------------------------------------------------------------------------ XaTransaction::XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *xaHelperp, DWORD rmCookie, DtxResourceManager^ rm) { bool success = false; xidp = NULL; commandCompletionp = NULL; firstDtxStartCompletionp = NULL; nativeHandler = NULL; resourceManager = rm; controlSession = rm->DtxControlSession; active = true; preparing = false; systemTransaction = t; IntPtr comTxp = IntPtr::Zero; completionHandle = gcnew ManualResetEvent(false); try { enlistedSessions = gcnew Collections::Generic::List(); // take a System.Transactions.Transaction and obtain // the corresponding DTC COM object. IDtcTransaction^ dtcTransaction = TransactionInterop::GetDtcTransaction(t); comTxp = Marshal::GetIUnknownForObject(dtcTransaction); XID winXid; HRESULT hr = xaHelperp->ConvertTridToXID((DWORD *)comTxp.ToPointer(), rmCookie, &winXid); if (hr != S_OK) throw gcnew QpidException("get XA XID"); // Convert the X/Open format to the internal Qpid format xidp = new qpid::framing::Xid(); xidp->setFormat((uint32_t) winXid.formatID); int bqualPos = 0; if (winXid.gtrid_length > 0) { xidp->setGlobalId(std::string(winXid.data, winXid.gtrid_length)); bqualPos = winXid.gtrid_length; } if (winXid.bqual_length > 0) { xidp->setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length)); } // create the callback chain: DTC proxy -> DtcCallbackHandler -> this inboundDelegate = gcnew DtcCallbackDelegate(this, &XaTransaction::DtcCallback); IntPtr ip = Marshal::GetFunctionPointerForDelegate(inboundDelegate); nativeHandler = new DtcCallbackHandler(static_cast(ip.ToPointer())); // add myself for later smart pointer destruction nativeHandler->AddRef(); hr = xaHelperp->EnlistWithRM(rmCookie, (ITransaction *)comTxp.ToPointer(), nativeHandler, &(nativeHandler->txHandle)); if (hr != S_OK) throw gcnew QpidException("Enlist"); success = true; } finally { if (!success) Cleanup(); if (comTxp != IntPtr::Zero) ((IUnknown *) comTxp.ToPointer())->Release(); } } void XaTransaction::Cleanup() { if (firstDtxStartCompletionp != NULL) { try { firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); } catch (...) { // TODO: log it? } firstDtxStartCompletionp = NULL; } if (nativeHandler != NULL) { nativeHandler->Release(); nativeHandler = NULL; } if (xidp != NULL) { delete xidp; xidp = NULL; } } XaTransaction^ XaTransaction::Enlist (AmqpSession ^session) { lock l(enlistedSessions); if (!active) throw gcnew QpidException("transaction enlistment internal error"); if (!enlistedSessions->Contains(session)) { enlistedSessions->Add(session); if (firstEnlistedSession == nullptr) { firstEnlistedSession = session; IntPtr intptr = session->DtxStart((IntPtr) xidp, false, false); firstDtxStartCompletionp = (TypedResult *) intptr.ToPointer(); } else { // the broker must see the dtxStart as a join operation, and it must arrive // at the broker after the first dtx start if (firstDtxStartCompletionp != NULL) firstDtxStartCompletionp->wait(); session->DtxStart((IntPtr) xidp, true, false); } } else { // already started once, so resume is true session->DtxStart((IntPtr) xidp, false, true); } return this; } void XaTransaction::SessionClosing(AmqpSession^ session) { lock l(enlistedSessions); if (!enlistedSessions->Contains(session)) return; enlistedSessions->Remove(session); if (!active) { // Phase0Flush already done on all sessions l.release(); return; } IntPtr completion = session->BeginPhase0Flush(this); session->EndPhase0Flush(this, completion); if (session == firstEnlistedSession) { // if we just completed the dtxEnd, we know the dtxStart completed before that if (firstDtxStartCompletionp != NULL) { firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); firstDtxStartCompletionp = NULL; } } } void XaTransaction::Phase0Flush() { // let each session delimit their transactional work with an AMQP dtx.end protocol frame lock l(enlistedSessions); if (!active) return; active = false; // no more enlistments int scount = enlistedSessions->Count; if (scount > 0) { array ^completions = gcnew array(scount); for (int i = 0; i < scount; i++) { // TODO: skip phase0 flush for rollback case completions[i] = enlistedSessions[i]->BeginPhase0Flush(this); } for (int i = 0; i < scount; i++) { // without each session.sync(), session commands are queued up in the right order, // but on their separate outbound channels, and destined for receipt at separate Broker inbound // channels. It is not clear how to be sure Phase 0 dtx.End is processed in the // correct order before commit on the broker without the sync. enlistedSessions[i]->EndPhase0Flush(this, completions[i]); } } // since all dtxEnds have completed, we know all starts have too if (firstDtxStartCompletionp != NULL) { try { firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); } catch (...) { // TODO: log it? } firstDtxStartCompletionp = NULL; } } bool XaTransaction::DtcCallback (DtcCallbackType callback) { // called by the DTC proxy thread. Be brief and don't block (but Phase0Flush?) if (AppDomain::CurrentDomain->IsFinalizingForUnload()) return false; IntPtr intptr = IntPtr::Zero; currentCommand = callback; try { switch (callback) { case DTC_PREPARE: Phase0Flush(); try { intptr = controlSession->DtxPrepare((IntPtr) xidp); preparing = true; resourceManager->IncrementDoubt(); } catch (System::Exception^ ) { // intptr remains nullptr } commandCompletionp = (TypedResult *) intptr.ToPointer(); ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); break; case DTC_COMMIT: #ifdef QPID_RECOVERY_TEST_HOOK if (debugFailMode){ return; } #endif // no phase 0 required. always preceded by a prepare try { intptr = controlSession->DtxCommit((IntPtr) xidp, false); } catch (System::Exception^ ) { // intptr remains nullptr } commandCompletionp = (TypedResult *) intptr.ToPointer(); ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); break; case DTC_ABORT: Phase0Flush(); #ifdef QPID_RECOVERY_TEST_HOOK if (debugFailMode){ return; } #endif try { intptr = controlSession->DtxRollback((IntPtr) xidp); } catch (System::Exception^ ) { // intptr remains nullptr } commandCompletionp = (TypedResult *) intptr.ToPointer(); ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); break; case DTC_SINGLE_PHASE: Phase0Flush(); try { intptr = controlSession->DtxCommit((IntPtr) xidp, true); } catch (System::Exception^ ) { // intptr remains nullptr } commandCompletionp = (TypedResult *) intptr.ToPointer(); ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); break; case DTC_TMDOWN: commandCompletionp = NULL; ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); break; } return true; } catch (System::Exception^ e) { // TODO: log it Console::WriteLine("Unexpected DtcCallback exception: {0}", e->ToString()); } catch (...) { // TODO: log it } return false; } // this handles the case where the application regains control for // a new transaction before we are notified (abort/rollback // optimization in DTC). void XaTransaction::NotifyPhase0() { if (active) Phase0Flush(); } void XaTransaction::AsyncCompleter(Object ^unused) { bool success = false; if (commandCompletionp != NULL) { try { // waits for the AMQP broker's response and returns the decoded content XaResult& xaResult = commandCompletionp->get(); if (xaResult.hasStatus()) { if (xaResult.getStatus() == XaStatus::XA_STATUS_XA_OK) { success = true; } } } catch (...) { // TODO: log it? } try { controlSession->ReleaseCompletion((IntPtr) commandCompletionp); } catch (...) { // TODO: log it? } commandCompletionp = NULL; } ITransactionEnlistmentAsync *dtcTxHandle = nativeHandler->txHandle; HRESULT hr = success ? S_OK : E_FAIL; switch (currentCommand) { case DTC_PREPARE: dtcTxHandle->PrepareRequestDone(hr, NULL, NULL); break; case DTC_COMMIT: dtcTxHandle->CommitRequestDone(hr); if (success) resourceManager->DecrementDoubt(); Complete(); break; case DTC_ABORT: dtcTxHandle->AbortRequestDone(hr); if (success) { if (preparing) { preparing = false; resourceManager->DecrementDoubt(); } } Complete(); break; case DTC_SINGLE_PHASE: if (success) hr = XACT_S_SINGLEPHASE; dtcTxHandle->PrepareRequestDone(hr, NULL, NULL); Complete(); break; case DTC_TMDOWN: // Stop the RM from accepting new enlistments resourceManager->TmDown(); Complete(); break; } } void XaTransaction::Complete() { Cleanup(); resourceManager->Complete(systemTransaction); completionHandle->Set(); } void XaTransaction::WaitForCompletion() { completionHandle->WaitOne(); } /* void XaTransaction::WaitForFlush() { isFlushedHandle->WaitOne(); } */ // called from DtxResourceManager Finalize void XaTransaction::ChildFinalize() { lock l(enlistedSessions); Phase0Flush(); Cleanup(); } }}} // namespace Apache::Qpid::Interop