summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-12-17 16:03:54 +0000
committerStephen D. Huston <shuston@apache.org>2009-12-17 16:03:54 +0000
commit69d56a233cd6dbe02287c8a237a18fbc7505f140 (patch)
tree3e9198c1fe50ba7574afb1a718307c165739d180
parent62a08a90f3d13e1618a4dceeecef1242ba820dd7 (diff)
downloadqpid-python-69d56a233cd6dbe02287c8a237a18fbc7505f140.tar.gz
Apply patches QPID-2128-2.patch, cppbld.patch and interop.tx.patch; resolves QPID-2128.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@891783 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt9
-rw-r--r--wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs7
-rw-r--r--wcf/src/Apache/Qpid/Channel/Channel.csproj1
-rw-r--r--wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp664
-rw-r--r--wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp14
-rw-r--r--wcf/src/Apache/Qpid/Interop/AmqpConnection.h26
-rw-r--r--wcf/src/Apache/Qpid/Interop/AmqpSession.cpp393
-rw-r--r--wcf/src/Apache/Qpid/Interop/AmqpSession.h38
-rw-r--r--wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp285
-rw-r--r--wcf/src/Apache/Qpid/Interop/DtxResourceManager.h76
-rw-r--r--wcf/src/Apache/Qpid/Interop/InputLink.cpp86
-rw-r--r--wcf/src/Apache/Qpid/Interop/InputLink.h2
-rw-r--r--wcf/src/Apache/Qpid/Interop/Interop.vcproj23
-rw-r--r--wcf/src/Apache/Qpid/Interop/XaTransaction.cpp525
-rw-r--r--wcf/src/Apache/Qpid/Interop/XaTransaction.h96
15 files changed, 2162 insertions, 83 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index b7d96ba273..899dd5568c 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -693,6 +693,15 @@ if (WIN32)
COMPONENT ${QPID_COMPONENT_CLIENT})
endif (WIN32)
+if (WIN32)
+ set(AMQP_WCF_DIR ${qpid-cpp_SOURCE_DIR}/../wcf)
+ set(DTC_PLUGIN_SOURCE ${AMQP_WCF_DIR}/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp)
+ if (EXISTS ${DTC_PLUGIN_SOURCE})
+ add_library (qpidxarm SHARED ${DTC_PLUGIN_SOURCE})
+ target_link_libraries (qpidxarm qpidclient qpidcommon)
+ endif (EXISTS ${DTC_PLUGIN_SOURCE})
+endif (WIN32)
+
set (qpidbroker_SOURCES
${mgen_broker_cpp}
${qpidbroker_platform_SOURCES}
diff --git a/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
index 08c565af18..7993252309 100644
--- a/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
+++ b/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
@@ -25,7 +25,7 @@ namespace Apache.Qpid.Channel
using System.ServiceModel.Description;
using Apache.Qpid.AmqpTypes;
- public class AmqpTransportBindingElement : TransportBindingElement
+ public class AmqpTransportBindingElement : TransportBindingElement, ITransactedBindingElement
{
AmqpChannelProperties channelProperties;
bool shared;
@@ -112,6 +112,11 @@ namespace Apache.Qpid.Channel
set { this.shared = value; }
}
+ public bool TransactedReceiveEnabled
+ {
+ get { return true; }
+ }
+
public TransferMode TransferMode
{
get { return this.channelProperties.TransferMode; }
diff --git a/wcf/src/Apache/Qpid/Channel/Channel.csproj b/wcf/src/Apache/Qpid/Channel/Channel.csproj
index 7484bc38ac..ac90fb7d64 100644
--- a/wcf/src/Apache/Qpid/Channel/Channel.csproj
+++ b/wcf/src/Apache/Qpid/Channel/Channel.csproj
@@ -90,6 +90,7 @@ under the License.
<Reference Include="System.ServiceModel">
<RequiredTargetFramework>3.0</RequiredTargetFramework>
</Reference>
+ <Reference Include="System.Transactions" />
<Reference Include="System.XML" />
</ItemGroup>
<ItemGroup>
diff --git a/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp b/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
new file mode 100644
index 0000000000..f9d8bd8521
--- /dev/null
+++ b/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
@@ -0,0 +1,664 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+
+//
+// This module provides the backend recovery driver for Windows resource managers based on
+// the IDtcToXaHelperSinglePipe interface. The dll is loaded (LoadLibrary) directly into DTC
+// itself and runs at a different protection level from the resource manager instance, which
+// runs inside the application.
+//
+// The DTC dynamically loads this file, calls GetXaSwitch() to access the XA interface
+// implementation and unloads the dll when done.
+//
+// This DTC plugin is only called for registration and recovery. Each time the application
+// registers the Qpid resource manager with DTC, the plugin is loaded and a successful
+// connection via xa_open is confirmed before completing registration and saving the DSN
+// connection string in the DTC log for possible recovery. On recovery, the DSN is re-used to
+// restablish a new connection with the broker and perform recovery.
+//
+// Because this plugin is not involved in coordinating any active transactions it only needs to
+// partially implement the XA interface.
+//
+// For the same reason, the locking strategy is simple. A single global lock is used.
+// Whenever networking activity is about to take place, the lock is relinquished and retaken
+// soon thereafter.
+
+
+#include <windows.h>
+#include <transact.h>
+#include <xolehlp.h>
+#include <txdtc.h>
+#include <xa.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Connection.h"
+
+
+#include <map>
+#include <iostream>
+#include <fstream>
+
+namespace Apache {
+namespace Qpid {
+namespace DtcPlugin {
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::framing::dtx;
+
+class ResourceManager
+{
+private:
+ Connection qpidConnection;
+ Session qpidSession;
+ bool active;
+ std::string host;
+ int port;
+ int rmid;
+ std::vector<qpid::framing::Xid> inDoubtXids;
+ // current scan position, or -1 if no scan
+ int cursor;
+public:
+ ResourceManager(int id, std::string h, int p) : rmid(id), host(h), port(p), active(false), cursor(-1) {}
+ ~ResourceManager() {}
+ INT open();
+ INT close();
+ INT commit(XID *xid);
+ INT rollback(XID *xid);
+ INT recover(XID *xids, long count, long flags);
+};
+
+
+CRITICAL_SECTION rmLock;
+
+std::map<int, ResourceManager*> rmMap;
+HMODULE thisDll = NULL;
+bool memLocked = false;
+
+#define QPIDHMCHARS 512
+
+void pinDll() {
+ if (!memLocked) {
+ char thisDllName[QPIDHMCHARS];
+ HMODULE ignore;
+
+ DWORD nc = GetModuleFileName(thisDll, thisDllName, QPIDHMCHARS);
+ if ((nc > 0) && (nc < QPIDHMCHARS)) {
+ memLocked = GetModuleHandleEx(GET_MODULE_HANDLE_EX_FLAG_PIN, thisDllName, &ignore);
+ }
+ }
+}
+
+
+void XaToQpid(XID &winXid, Xid &qpidXid) {
+ // convert from XA defined structure XID to the Qpid framing structure
+ qpidXid.setFormat((uint32_t) winXid.formatID);
+ int bqualPos = 0;
+ if (winXid.gtrid_length > 0) {
+ qpidXid.setGlobalId(std::string(winXid.data, winXid.gtrid_length));
+ bqualPos = winXid.gtrid_length;
+ }
+ if (winXid.bqual_length > 0) {
+ qpidXid.setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length));
+ }
+}
+
+
+// this function assumes that the qpidXid has already been validated for the memory copy
+
+void QpidToXa(Xid &qpidXid, XID &winXid) {
+ // convert from the Qpid framing structure to the XA defined structure XID
+ winXid.formatID = qpidXid.getFormat();
+
+ const std::string& global_s = qpidXid.getGlobalId();
+ size_t gl = global_s.size();
+ winXid.gtrid_length = (long) gl;
+ if (gl > 0)
+ global_s.copy(winXid.data, gl);
+
+ const std::string branch_s = qpidXid.getBranchId();
+ size_t bl = branch_s.size();
+ winXid.bqual_length = (long) bl;
+ if (bl > 0)
+ branch_s.copy(winXid.data + gl, bl);
+}
+
+
+/* parse string from AmqpConnection.h
+
+ this info will eventually include authentication tokens
+
+ dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host,
+ System::Diagnostics::Process::GetCurrentProcess()->Id,
+ AppDomain::CurrentDomain->Id);
+*/
+
+bool parseDsn (const char *dsn, std::string& host, int& port) {
+ if (dsn == NULL)
+ return false;
+
+ size_t len = strnlen(dsn, 1025);
+ if (len > 1024)
+ return false;
+
+ int firstDot = 0;
+ for (int i = 0; i < len; i++)
+ if (dsn[i] == '.') {
+ firstDot = i;
+ break;
+ }
+ if (!firstDot)
+ return false;
+
+ // look for 2 dots side by side to indicate end of the host
+ int doubleDot = 0;
+ for (int i = firstDot + 1; i < (len - 1); i++)
+ if ((dsn[i] == '.') && (dsn[i+1] == '.')) {
+ doubleDot = i;
+ break;
+ }
+ if (!doubleDot)
+ return false;
+
+ port = 0;
+ for (int i = 0; i < firstDot; i++) {
+ char c = dsn[i];
+ if ((c < '0') || (c > '9'))
+ return false;
+ port = (10 * port) + (c - '0');
+ }
+
+ host.assign(dsn + firstDot + 1, (doubleDot - firstDot) - 1);
+ return true;
+}
+
+
+INT ResourceManager::open() {
+ INT rv = XAER_RMERR; // placeholder until we successfully connect to resource
+ active = true;
+ LeaveCriticalSection(&rmLock);
+
+ try {
+ qpidConnection.open(host, port);
+ qpidSession = qpidConnection.newSession();
+ rv = XA_OK;
+/*
+TODO: logging
+ } catch (const qpid::Exception& error) {
+ // log it
+ } catch (const std::exception& e2) {
+ // log it
+*/
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+ return rv;
+}
+
+
+INT ResourceManager::close() {
+ // should never be called when already sending other commands to broker
+ if (active)
+ return XAER_PROTO;
+
+ INT rv = XAER_RMERR; // placeholder until we successfully close resource
+ active = true;
+ LeaveCriticalSection(&rmLock);
+ try {
+ if (qpidSession.isValid()) {
+ qpidSession.close();
+ }
+ if (qpidConnection.isOpen()) {
+ qpidConnection.close();
+ }
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+
+ if (!qpidConnection.isOpen()) {
+ rv = XA_OK;
+ }
+ return rv;
+}
+
+
+INT ResourceManager::commit(XID *xid) {
+ if (active)
+ return XAER_PROTO;
+
+ INT rv = XAER_RMFAIL;
+ active = true;
+ LeaveCriticalSection(&rmLock);
+
+ try {
+ qpid::framing::Xid qpidXid;
+ XaToQpid(*xid, qpidXid);
+
+ XaResult xaResult = qpidSession.dtxCommit(qpidXid, false, true);
+ if (xaResult.hasStatus()) {
+ uint16_t status = xaResult.getStatus();
+ switch ((XaStatus) status) {
+ case XA_STATUS_XA_OK:
+ case XA_STATUS_XA_RDONLY:
+ case XA_STATUS_XA_HEURCOM:
+ rv = XA_OK;
+ break;
+
+ default:
+ // commit failed and a retry won't fix
+ rv = XAER_RMERR;
+ break;
+ }
+
+ }
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+ return rv;
+}
+
+
+INT ResourceManager::rollback(XID *xid) {
+ if (active)
+ return XAER_PROTO;
+
+ INT rv = XAER_RMFAIL;
+ active = true;
+ LeaveCriticalSection(&rmLock);
+
+ try {
+ qpid::framing::Xid qpidXid;
+ XaToQpid(*xid, qpidXid);
+
+ XaResult xaResult = qpidSession.dtxRollback(qpidXid, true);
+ if (xaResult.hasStatus()) {
+ uint16_t status = xaResult.getStatus();
+ switch ((XaStatus) status) {
+ case XA_STATUS_XA_OK:
+ case XA_STATUS_XA_HEURRB:
+ rv = XA_OK;
+ break;
+
+ default:
+ // RM internal error
+ rv = XA_RBPROTO;
+ break;
+ }
+ }
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+ return rv;
+}
+
+
+INT ResourceManager::recover(XID *xids, long count, long flags) {
+ if (active)
+ return XAER_PROTO;
+
+ if ((xids == NULL) && (count != 0))
+ return XAER_INVAL;
+
+ if (count < 0)
+ return XAER_INVAL;
+
+ if (!(flags & TMSTARTRSCAN) && (cursor == -1))
+ // no existing scan and no scan requested
+ return XAER_INVAL;
+
+ INT status = XA_OK;
+
+ if (flags & TMSTARTRSCAN) {
+ // start a fresh scan
+ cursor = -1;
+ inDoubtXids.clear();
+ active = true;
+ LeaveCriticalSection(&rmLock);
+
+ try {
+ // status if we can't talk to the broker
+ status = XAER_RMFAIL;
+ std::vector<std::string> wireFormatXids;
+
+ DtxRecoverResult dtxrr = qpidSession.dtxRecover(true);
+
+ // status if we can't process the xids
+ status = XAER_RMERR;
+ dtxrr.getInDoubt().collect(wireFormatXids);
+ size_t nXids = wireFormatXids.size();
+
+ if (nXids > 0) {
+ StructHelper decoder;
+ Xid qpidXid;
+ for (int i = 0; i < nXids; i++) {
+ decoder.decode (qpidXid, wireFormatXids[i]);
+ inDoubtXids.push_back(qpidXid);
+ }
+
+ // if we got here the decoder validated the Xids
+ status = XA_OK;
+
+ // make sure none are too big, just in case
+
+ for (int i = 0; i < nXids; i++) {
+ Xid& xid = inDoubtXids[i];
+ size_t l1 = xid.hasGlobalId() ? xid.getGlobalId().size() : 0;
+ size_t l2 = xid.hasBranchId() ? xid.getBranchId().size() : 0;
+ if ((l1 > MAXGTRIDSIZE) || (l2 > MAXBQUALSIZE) ||
+ ((l1 + l2) > XIDDATASIZE)) {
+ status = XAER_RMERR;
+ break;
+ }
+ }
+ }
+ else {
+ // nXids == 0, the previously cleared inDoubtXids is correctly populated
+ status = XA_OK;
+ }
+
+ if (status == XA_OK)
+ cursor = 0;
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+ }
+ else {
+ // TMSTARTRSCAN not set, is there an existing scan to work from?
+ if (cursor == -1)
+ return XAER_INVAL;
+ }
+
+ if (status != XA_OK)
+ return status;
+
+ INT actualCount = count;
+ if (count > 0) {
+ int nAvailable = (int) inDoubtXids.size() - cursor;
+ if (nAvailable < count)
+ actualCount = nAvailable;
+
+ for (int i = 0; i < actualCount; i++) {
+ Xid& qpidXid = inDoubtXids[i + cursor];
+ QpidToXa(qpidXid, xids[i]);
+ }
+ }
+
+ if (flags & TMENDRSCAN) {
+ cursor = -1;
+ inDoubtXids.clear();
+ }
+
+ return actualCount;
+}
+
+
+// Call with lock held
+
+ResourceManager* findRm(int rmid) {
+ if (rmMap.find(rmid) == rmMap.end()) {
+ return NULL;
+ }
+ return rmMap[rmid];
+}
+
+
+INT __cdecl xa_open (char *xa_info, int rmid, long flags) {
+ if (flags & TMASYNC)
+ return XAER_ASYNC;
+
+ INT rv = XAER_RMERR;
+ EnterCriticalSection(&rmLock);
+
+ ResourceManager* rmp = findRm(rmid);
+ if (rmp != NULL) {
+ // error: already in use
+ rv = XAER_PROTO;
+ }
+ else {
+ std::string brokerHost;
+ int brokerPort;
+ if (parseDsn(xa_info, brokerHost, brokerPort)) {
+
+ try {
+ rmp = new ResourceManager(rmid, brokerHost, brokerPort);
+
+ rv = rmp->open();
+ if (rv != XA_OK) {
+ delete (rmp);
+ }
+ else {
+ rmMap[rmid] = rmp;
+ }
+ } catch (...) {}
+ }
+ else {
+ rv = XAER_INVAL;
+ }
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_close (char *xa_info, int rmid, long flags) {
+ if (flags & TMASYNC)
+ return XAER_ASYNC;
+
+ INT rv = XAER_RMERR;
+
+ EnterCriticalSection(&rmLock);
+ ResourceManager* rmp = findRm(rmid);
+
+ if (rmp == NULL) {
+ // can close multiple times
+ rv = XA_OK;
+ }
+ else {
+ rv = rmp->close();
+ rmMap.erase(rmid);
+ try {
+ delete (rmp);
+ } catch (...) {
+ // TODO: log it
+ }
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_commit (XID *xid, int rmid, long flags) {
+ if (flags & TMASYNC)
+ return XAER_ASYNC;
+
+ INT rv = XAER_RMFAIL;
+
+ EnterCriticalSection(&rmLock);
+ ResourceManager* rmp = findRm(rmid);
+
+ if (rmp == NULL) {
+ rv = XAER_INVAL;
+ }
+ else {
+ rv = rmp->commit(xid);
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_rollback (XID *xid, int rmid, long flags) {
+ if (flags & TMASYNC)
+ return XAER_ASYNC;
+
+ INT rv = XAER_RMFAIL;
+
+ EnterCriticalSection(&rmLock);
+ ResourceManager* rmp = findRm(rmid);
+
+ if (rmp == NULL) {
+ rv = XAER_INVAL;
+ }
+ else {
+ rv = rmp->rollback(xid);
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_recover (XID *xids, long count, int rmid, long flags) {
+ INT rv = XAER_RMFAIL;
+
+ EnterCriticalSection(&rmLock);
+ ResourceManager* rmp = findRm(rmid);
+
+ if (rmp == NULL) {
+ rv = XAER_PROTO;
+ }
+ else {
+ rv = rmp->recover(xids, count, flags);
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_start (XID *xid, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+INT __cdecl xa_end (XID *xid, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+INT __cdecl xa_prepare (XID *xid, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+INT __cdecl xa_forget (XID *xid, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+INT __cdecl xa_complete (int *handle, int *retval, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+
+xa_switch_t xaSwitch;
+
+HRESULT __cdecl GetQpidXaSwitch (DWORD XaSwitchFlags, xa_switch_t ** ppXaSwitch)
+{
+ // needed for now due to implicit use of FreeLibrary in WSACleanup() in qpid/cpp/src/qpid/sys/windows/Socket.cpp
+ pinDll();
+
+ if (xaSwitch.xa_open_entry != xa_open) {
+
+ xaSwitch.xa_open_entry = xa_open;
+ xaSwitch.xa_close_entry = xa_close;
+ xaSwitch.xa_start_entry = xa_start;
+ xaSwitch.xa_end_entry = xa_end;
+ xaSwitch.xa_prepare_entry = xa_prepare;
+ xaSwitch.xa_commit_entry = xa_commit;
+ xaSwitch.xa_rollback_entry = xa_rollback;
+ xaSwitch.xa_recover_entry = xa_recover;
+ xaSwitch.xa_forget_entry = xa_forget;
+ xaSwitch.xa_complete_entry = xa_complete;
+
+ strcpy_s(xaSwitch.name, RMNAMESZ, "qpidxarm");
+ xaSwitch.flags = TMNOMIGRATE;
+ xaSwitch.version = 0;
+ }
+ *ppXaSwitch = &xaSwitch;
+ return S_OK;
+}
+
+
+
+
+}}} // namespace Apache::Qpid::DtcPlugin
+
+
+// GetXaSwitch
+
+extern "C" {
+
+ __declspec(dllexport) HRESULT __cdecl GetXaSwitch (DWORD XaSwitchFlags, xa_switch_t ** ppXaSwitch)
+ {
+ return Apache::Qpid::DtcPlugin::GetQpidXaSwitch (XaSwitchFlags, ppXaSwitch);
+ }
+}
+
+
+// dllmain
+
+BOOL APIENTRY DllMain( HMODULE hModule,
+ DWORD ul_reason_for_call,
+ LPVOID lpReserved)
+{
+
+ switch (ul_reason_for_call)
+ {
+ case DLL_PROCESS_ATTACH:
+ InitializeCriticalSection(&Apache::Qpid::DtcPlugin::rmLock);
+ Apache::Qpid::DtcPlugin::thisDll = hModule;
+ break;
+
+ case DLL_PROCESS_DETACH:
+ DeleteCriticalSection(&Apache::Qpid::DtcPlugin::rmLock);
+ break;
+
+ case DLL_THREAD_ATTACH:
+ case DLL_THREAD_DETACH:
+ break;
+ }
+ return TRUE;
+}
+
diff --git a/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
index 02d6c7ab18..c3afdf2280 100644
--- a/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
+++ b/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
@@ -19,6 +19,7 @@
#include <windows.h>
#include <msclr\lock.h>
+#include <oletx2xa.h>
#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
@@ -31,6 +32,8 @@
#include "AmqpSession.h"
#include "QpidMarshal.h"
#include "QpidException.h"
+#include "DtxResourceManager.h"
+#include "XaTransaction.h"
namespace Apache {
namespace Qpid {
@@ -66,6 +69,9 @@ AmqpConnection::AmqpConnection(String^ server, int port) :
success = true;
const ConnectionSettings& settings = connectionp->getNegotiatedSettings();
this->maxFrameSize = settings.maxFrameSize;
+ this->host = server;
+ this->port = port;
+ this->isOpen = true;
} catch (const qpid::Exception& error) {
String^ errmsg = gcnew String(error.what());
openException = gcnew QpidException(errmsg);
@@ -80,6 +86,12 @@ AmqpConnection::AmqpConnection(String^ server, int port) :
}
}
+AmqpConnection^ AmqpConnection::Clone() {
+ if (disposed)
+ throw gcnew ObjectDisposedException("AmqpConnection.Clone");
+ return gcnew AmqpConnection (this->host, this->port);
+}
+
void AmqpConnection::Cleanup()
{
{
@@ -91,6 +103,7 @@ void AmqpConnection::Cleanup()
try {
// let the child sessions clean up
+
for each(AmqpSession^ s in sessions) {
s->ConnectionClosed();
}
@@ -98,6 +111,7 @@ void AmqpConnection::Cleanup()
finally
{
if (connectionp != NULL) {
+ isOpen = false;
connectionp->close();
delete connectionp;
connectionp = NULL;
diff --git a/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
index 2641391e82..6533185fa1 100644
--- a/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
+++ b/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
@@ -28,6 +28,7 @@ using namespace std;
using namespace qpid::client;
ref class AmqpSession;
+ref class DtxResourceManager;
public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs);
@@ -35,21 +36,44 @@ public ref class AmqpConnection
{
private:
Connection* connectionp;
- void Cleanup();
+ String^ host;
+ int port;
bool disposed;
Collections::Generic::List<AmqpSession^>^ sessions;
bool isOpen;
int busyCount;
int maxFrameSize;
+ DtxResourceManager^ dtxResourceManager;
+ void Cleanup();
+ // unique string used for distributed transactions
+ String^ dataSourceName;
internal:
void NotifyBusy();
void NotifyIdle();
+ AmqpConnection^ Clone();
property int MaxFrameSize {
int get () { return maxFrameSize; }
}
+ property DtxResourceManager^ CachedResourceManager {
+ DtxResourceManager^ get () { return dtxResourceManager; }
+ void set (DtxResourceManager^ value) { dtxResourceManager = value; }
+ }
+
+ property String^ DataSourceName {
+ // Note: any change to this format has to be reflected in the DTC plugin's xa_open()
+ String^ get() {
+ if (dataSourceName == nullptr) {
+ dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host,
+ System::Diagnostics::Process::GetCurrentProcess()->Id,
+ AppDomain::CurrentDomain->Id);
+ }
+ return dataSourceName;
+ }
+ }
+
public:
AmqpConnection(System::String^ server, int port);
~AmqpConnection();
diff --git a/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
index 425a592509..d2adb41205 100644
--- a/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
+++ b/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
@@ -19,6 +19,7 @@
#include <windows.h>
#include <msclr\lock.h>
+#include <oletx2xa.h>
#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
@@ -28,6 +29,7 @@
#include "qpid/client/Message.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/Future.h"
+#include "qpid/framing/Xid.h"
#include "AmqpConnection.h"
#include "AmqpSession.h"
@@ -37,6 +39,8 @@
#include "OutputLink.h"
#include "QpidMarshal.h"
#include "QpidException.h"
+#include "XaTransaction.h"
+#include "DtxResourceManager.h"
namespace Apache {
namespace Qpid {
@@ -44,6 +48,7 @@ namespace Interop {
using namespace System;
using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
using namespace msclr;
using namespace qpid::client;
@@ -55,15 +60,20 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon
sessionp(NULL),
sessionImplp(NULL),
subs_mgrp(NULL),
- openCount(0)
+ helperRunning(false),
+ openCount(0),
+ syncCount(0),
+ closing(false),
+ dtxEnabled(false)
{
bool success = false;
-
try {
sessionp = new qpid::client::AsyncSession;
*sessionp = qpidConnectionp->newSession();
subs_mgrp = new SubscriptionManager (*sessionp);
waiters = gcnew Collections::Generic::List<CompletionWaiter^>();
+ sessionLock = waiters; // waiters convenient and not publicly visible
+ openCloseLock = gcnew Object();
success = true;
} finally {
if (!success) {
@@ -77,29 +87,36 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon
void AmqpSession::Cleanup()
{
+ bool connected = connection->IsOpen;
+
if (subs_mgrp != NULL) {
- subs_mgrp->stop();
+ if (connected)
+ subs_mgrp->stop();
delete subs_mgrp;
subs_mgrp = NULL;
}
- if (localQueuep != NULL) {
- delete localQueuep;
- localQueuep = NULL;
- }
-
if (sessionp != NULL) {
- sessionp->close();
+ if (connected) {
+ sessionp->close();
+ }
delete sessionp;
sessionp = NULL;
sessionImplp = NULL;
}
+}
- if (connectionp != NULL) {
- connectionp->close();
- delete connectionp;
- connectionp = NULL;
- }
+
+static qpid::framing::Xid& getXid(XaTransaction^ xaTx)
+{
+ return *((qpid::framing::Xid *)xaTx->XidHandle.ToPointer());
+}
+
+
+void AmqpSession::CheckOpen()
+{
+ if (closing)
+ throw gcnew ObjectDisposedException("AmqpSession");
}
@@ -107,7 +124,41 @@ void AmqpSession::Cleanup()
void AmqpSession::ConnectionClosed()
{
- lock l(waiters);
+ lock l(sessionLock);
+
+ if (closing)
+ return;
+
+ closing = true;
+
+ if (connection->IsOpen) {
+ // send closing handshakes...
+
+ if (dtxEnabled) {
+ // session may close before all its transactions complete, at least force the phase 0 flush
+ if (pendingTransactions->Count > 0) {
+ array<XaTransaction^>^ txArray = pendingTransactions->ToArray();
+ l.release();
+ for each (XaTransaction^ xaTx in txArray) {
+ //xaTx->SessionClosing(this);
+ xaTx->WaitForCompletion();
+ }
+ l.acquire();
+ }
+ }
+
+ WaitLastSync (%l);
+ // Assert pendingTransactions->Count == 0
+
+ if (openXaTransaction != nullptr) {
+ // send final dtxend
+ sessionp->dtxEnd(getXid(openXaTransaction), false, true, false);
+ openXaTransaction = nullptr;
+ openSystemTransaction = nullptr;
+ // this operation will complete by the time Cleanup() returns
+ }
+ }
+
Cleanup();
}
@@ -119,10 +170,14 @@ InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue)
InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary,
System::String^ filterKey, System::String^ exchange)
{
+ lock ocl(openCloseLock);
+ lock l(sessionLock);
+ CheckOpen();
+
InputLink^ link = gcnew InputLink (this, sourceQueue, sessionp, subs_mgrp, exclusive, temporary, filterKey, exchange);
{
- lock l(waiters);
if (openCount == 0) {
+ l.release();
connection->NotifyBusy();
}
openCount++;
@@ -132,9 +187,11 @@ InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclus
OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue)
{
- OutputLink^ link = gcnew OutputLink (this, targetQueue);
+ lock ocl(openCloseLock);
+ lock l(sessionLock);
+ CheckOpen();
- lock l(waiters);
+ OutputLink^ link = gcnew OutputLink (this, targetQueue);
if (sessionImplp == NULL) {
// not needed unless sending messages
@@ -144,6 +201,7 @@ OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue)
}
if (openCount == 0) {
+ l.release();
connection->NotifyBusy();
}
openCount++;
@@ -155,7 +213,7 @@ OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue)
// called whenever a child InputLink or OutputLink is closed or finalized
void AmqpSession::NotifyClosed()
{
- lock l(waiters);
+ lock ocl(openCloseLock);
openCount--;
if (openCount == 0) {
connection->NotifyIdle();
@@ -165,9 +223,14 @@ void AmqpSession::NotifyClosed()
CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state)
{
- lock l(waiters);
- if (sessionp == NULL)
- throw gcnew ObjectDisposedException("Send");
+ lock l(sessionLock);
+
+ // delimit with session dtx commands depending on the transaction context
+ UpdateTransactionState(%l);
+
+ CheckOpen();
+
+ bool syncPending = false;
// create an AMQP message.transfer command to use with the partial frameset from the MessageBodyStream
@@ -191,26 +254,34 @@ CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodySt
// waiter is responsible for releasing the Future native resource
futurep = NULL;
addWaiter(waiter);
- }
-
- l.release();
-
- if (waiter != nullptr)
return waiter;
+ }
// synchronous send with no timeout: no need to involve the asyncHelper thread
+ IncrementSyncs();
+ syncPending = true;
+ l.release();
internalWaitForCompletion((IntPtr) futurep);
}
finally {
+ if (syncPending) {
+ if (!l.is_locked())
+ l.acquire();
+ DecrementSyncs();
+ }
if (futurep != NULL)
delete (futurep);
}
return nullptr;
}
+
void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey)
{
+ lock l(sessionLock);
+ CheckOpen();
+
sessionp->exchangeBind(arg::queue=QpidMarshal::ToNative(queue),
arg::exchange=QpidMarshal::ToNative(exchange),
arg::bindingKey=QpidMarshal::ToNative(filterKey));
@@ -220,14 +291,8 @@ void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System::
void AmqpSession::internalWaitForCompletion(IntPtr fp)
{
- lock l(waiters);
- if (sessionp == NULL)
- throw gcnew ObjectDisposedException("AmqpSession");
+ Debug::Assert(syncCount > 0, "sync counter mismatch");
- // increment the smart pointer count to sessionImplp to guard agains async close
- Session sessionCopy(*sessionp);
-
- l.release();
// Qpid native lib call to wait for the command completion
((Future *)fp.ToPointer())->wait(*sessionImplp);
}
@@ -235,6 +300,7 @@ void AmqpSession::internalWaitForCompletion(IntPtr fp)
// call with lock held
void AmqpSession::addWaiter(CompletionWaiter^ waiter)
{
+ IncrementSyncs();
waiters->Add(waiter);
if (!helperRunning) {
helperRunning = true;
@@ -247,13 +313,14 @@ void AmqpSession::removeWaiter(CompletionWaiter^ waiter)
{
// a waiter can be removed from anywhere in the list if timed out
- lock l(waiters);
+ lock l(sessionLock);
int idx = waiters->IndexOf(waiter);
if (idx == -1) {
// TODO: assert or log
}
else {
waiters->RemoveAt(idx);
+ DecrementSyncs();
}
}
@@ -262,7 +329,7 @@ void AmqpSession::removeWaiter(CompletionWaiter^ waiter)
void AmqpSession::asyncHelper(Object ^unused)
{
- lock l(waiters);
+ lock l(sessionLock);
while (true) {
if (waiters->Count == 0) {
@@ -279,27 +346,267 @@ void AmqpSession::asyncHelper(Object ^unused)
}
}
-bool AmqpSession::MessageStop(Completion &comp, std::string &name)
+bool AmqpSession::MessageStop(std::string &name)
{
- lock l(waiters);
+ lock l(sessionLock);
- if (sessionp == NULL)
+ if (closing)
return false;
- comp = sessionp->messageStop(name, true);
+ sessionp->messageStop(name, true);
return true;
}
void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
{
- lock l(waiters);
+ lock l(sessionLock);
+
+ // delimit with session dtx commands depending on the transaction context
+ UpdateTransactionState(%l);
- if (sessionp == NULL)
- throw gcnew ObjectDisposedException("Accept");
+ CheckOpen();
sessionp->markCompleted(transfers, false);
sessionp->messageAccept(transfers, false);
}
+// call with session lock held
+
+void AmqpSession::UpdateTransactionState(lock^ slock)
+{
+ Transaction^ currentTx = Transaction::Current;
+ if ((currentTx == nullptr) && !dtxEnabled) {
+ // no transaction scope and no previous dtx work to monitor
+ return;
+ }
+
+ if (currentTx == openSystemTransaction) {
+ // no change
+ return;
+ }
+
+ if (!dtxEnabled) {
+ // AMQP requires that this be the first dtx-related command on the session
+ sessionp->dtxSelect(false);
+ dtxEnabled = true;
+ pendingTransactions = gcnew Collections::Generic::List<XaTransaction^>();
+ }
+
+ bool notify = false; // unless the System.Transaction is no longer active
+ XaTransaction^ oldXaTx = openXaTransaction;
+ if (openSystemTransaction != nullptr) {
+ // The application may start a new transaction before the phase0 on rollback
+ try {
+ if (openSystemTransaction->TransactionInformation->Status != TransactionStatus::Active) {
+ notify = true;
+ }
+ } catch (System::ObjectDisposedException^) {
+ notify = true;
+ }
+ }
+
+ slock->release();
+ // only use stack variables until lock re-acquired
+
+ if (notify) {
+ // will do call back to all enlisted sessions. call with session lock released.
+ // If NotifyPhase0() wins the race to start phase 0, openXaTransaction will be null
+ oldXaTx->NotifyPhase0();
+ }
+
+ XaTransaction^ newXaTx = nullptr;
+ if (currentTx != nullptr) {
+ // This must be called with locks released. The DTC and System.Transactions methods that
+ // will be called hold locks that interfere with the ITransactionResourceAsync callbacks.
+ newXaTx = DtxResourceManager::GetXaTransaction(this, currentTx);
+ }
+
+ slock->acquire();
+
+ if (closing)
+ return;
+
+ if (openSystemTransaction != nullptr) {
+ // some other transaction has the dtx window open
+ // close the XID window, suspend = true... in case it is used again
+ sessionp->dtxEnd(getXid(openXaTransaction), false, true, false);
+ openSystemTransaction = nullptr;
+ openXaTransaction = nullptr;
+ }
+
+
+ // Call enlist with session lock held. The XaTransaction will call DtxStart before returning.
+ if (newXaTx != nullptr) {
+ if (!pendingTransactions->Contains(newXaTx)) {
+ pendingTransactions->Add(newXaTx);
+ }
+
+ newXaTx->Enlist(this);
+ }
+
+ openXaTransaction = newXaTx;
+ openSystemTransaction = currentTx;
+}
+
+
+typedef TypedResult<qpid::framing::XaResult> XaResultCompletion;
+
+
+// send the required closing dtx.End before Phase 1
+
+IntPtr AmqpSession::BeginPhase0Flush(XaTransaction ^xaTx) {
+
+ lock l(sessionLock);
+ IntPtr completionp = IntPtr::Zero;
+ try {
+ if (sessionp != NULL) {
+
+ // proceed even if "closing == true", the phase 0 is part of the transition from closing to closed
+
+ if (xaTx != openXaTransaction) {
+ // a different transaction (or none) is in scope, so xaTx was previously suspended.
+ // must re-open it to close it properly
+ if (openXaTransaction != nullptr) {
+ // suspend the session's current pending transaction
+ // it wil be reopened in a future enlistment or phase 0 flush.
+ sessionp->dtxEnd(getXid(openXaTransaction), false, true, false);
+ }
+ // resuming
+ sessionp->dtxStart(getXid(xaTx), false, true, false);
+ }
+
+ // the closing (i.e. non-suspended) dtxEnd happens here (exactly once for a given transaction)
+ // set the sync bit since phase0 is a precondition to prepare or abort
+ completionp = (IntPtr) new XaResultCompletion(sessionp->dtxEnd(getXid(xaTx), false, false, true));
+ IncrementSyncs();
+ }
+ }
+ catch (System::Exception^ ) {
+ // all the caller wants to know is if completionp is non-null
+ }
+
+ openXaTransaction = nullptr;
+ openSystemTransaction = nullptr;
+ return completionp;
+}
+
+
+void AmqpSession::EndPhase0Flush(XaTransaction ^xaTx, IntPtr intptr) {
+ XaResultCompletion *completionp = (XaResultCompletion *) intptr.ToPointer();
+ lock l(sessionLock);
+
+ if (completionp != NULL) {
+ try {
+ l.release();
+ completionp->wait();
+ pendingTransactions->Remove(xaTx);
+ }
+ catch (System::Exception^) {
+ // connection closed or network drop
+ }
+ finally {
+ l.acquire();
+ DecrementSyncs();
+ delete completionp;
+ }
+ }
+}
+
+
+IntPtr AmqpSession::DtxStart(IntPtr ip, bool join, bool resume) {
+ // called with session lock held (as a callback from the Enlist())
+ // The XaTransaction knows if this should be the originating dtxStart, or a join/resume
+ IntPtr rv = IntPtr::Zero;
+ qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer();
+ if (join || resume) {
+ sessionp->dtxStart(*xidp, join, resume, false);
+ }
+ else {
+ // The XaTransaction needs to track when the first dtxStart completes to safely request a join
+ IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs
+ rv = (IntPtr) new XaResultCompletion(sessionp->dtxStart(*xidp, join, resume, false));
+ }
+
+ return rv;
+}
+
+
+IntPtr AmqpSession::DtxPrepare(IntPtr ip) {
+ qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer();
+ lock l(sessionLock);
+
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs
+ return (IntPtr) new XaResultCompletion(sessionp->dtxPrepare(*xidp, true));
+}
+
+
+IntPtr AmqpSession::DtxCommit(IntPtr ip, bool onePhase) {
+ qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer();
+ lock l(sessionLock);
+
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs
+ return (IntPtr) new XaResultCompletion(sessionp->dtxCommit(*xidp, onePhase, true));
+}
+
+
+IntPtr AmqpSession::DtxRollback(IntPtr ip) {
+ qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer();
+ lock l(sessionLock);
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs
+
+ return (IntPtr) new XaResultCompletion(sessionp->dtxRollback(*xidp, true));
+}
+
+
+//call with lock held
+void AmqpSession::IncrementSyncs() {
+ syncCount++;
+}
+
+
+//call with lock held
+void AmqpSession::DecrementSyncs() {
+ syncCount--;
+ Debug::Assert(syncCount >= 0, "sync counter underrun");
+ if (syncCount == 0) {
+ if (closeWaitHandle != nullptr) {
+ // now OK to move from closing to closed
+ closeWaitHandle->Set();
+ }
+ }
+}
+
+
+// call with lock held
+void AmqpSession::WaitLastSync(lock ^l) {
+ if (syncCount == 0)
+ return;
+ if (AppDomain::CurrentDomain->IsFinalizingForUnload()) {
+ // a wait would be a hang. No more syncs coming
+ return;
+ }
+ if (closeWaitHandle == nullptr)
+ closeWaitHandle = gcnew ManualResetEvent(false);
+ l->release();
+ closeWaitHandle->WaitOne();
+ l->acquire();
+}
+
+
+void AmqpSession::ReleaseCompletion(IntPtr completion) {
+ lock l(sessionLock);
+ DecrementSyncs();
+ delete completion.ToPointer();
+}
+
}}} // namespace Apache::Qpid::Cli
diff --git a/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/wcf/src/Apache/Qpid/Interop/AmqpSession.h
index 8306cdf720..88ffd18dcc 100644
--- a/wcf/src/Apache/Qpid/Interop/AmqpSession.h
+++ b/wcf/src/Apache/Qpid/Interop/AmqpSession.h
@@ -29,29 +29,50 @@ namespace Interop {
using namespace System;
using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace System::Diagnostics;
+
using namespace qpid::client;
using namespace std;
ref class InputLink;
ref class OutputLink;
+ref class XaTransaction;
public ref class AmqpSession
{
private:
+ Object^ sessionLock;
+ Object^ openCloseLock;
AmqpConnection^ connection;
- Connection* connectionp;
AsyncSession* sessionp;
SessionImpl* sessionImplp;
SubscriptionManager* subs_mgrp;
- LocalQueue* localQueuep;
Collections::Generic::List<CompletionWaiter^>^ waiters;
bool helperRunning;
+
+ // number of active InputLinks and OutputLinks
int openCount;
+ // the number of async commands sent to the broker that need completion confirmation
+ int syncCount;
+
+ bool closing;
+ ManualResetEvent^ closeWaitHandle;
+ bool dtxEnabled;
+ Transaction^ openSystemTransaction;
+ XaTransaction^ openXaTransaction;
+ Collections::Generic::List<XaTransaction^>^ pendingTransactions;
+
void Cleanup();
+ void CheckOpen();
void asyncHelper(Object ^);
void addWaiter(CompletionWaiter^ waiter);
+ void UpdateTransactionState(msclr::lock^ sessionLock);
+ void IncrementSyncs();
+ void DecrementSyncs();
+ void WaitLastSync(msclr::lock^ l);
public:
OutputLink^ CreateOutputLink(System::String^ targetQueue);
@@ -66,16 +87,21 @@ internal:
void NotifyClosed();
CompletionWaiter^ SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state);
void ConnectionClosed();
- void internalWaitForCompletion(IntPtr Future);
+ void internalWaitForCompletion(IntPtr future);
void removeWaiter(CompletionWaiter^ waiter);
- bool MessageStop(Completion &comp, std::string &name);
+ bool MessageStop(std::string &name);
void AcceptAndComplete(SequenceSet& transfers);
+ IntPtr BeginPhase0Flush(XaTransaction^);
+ void EndPhase0Flush(XaTransaction^, IntPtr);
+ IntPtr DtxStart(IntPtr xidp, bool, bool);
+ IntPtr DtxPrepare(IntPtr xidp);
+ IntPtr DtxCommit(IntPtr xidp, bool onePhase);
+ IntPtr DtxRollback(IntPtr xidp);
+ void ReleaseCompletion(IntPtr completion);
property AmqpConnection^ Connection {
AmqpConnection^ get () { return connection; }
}
-
-
};
}}} // namespace Apache::Qpid::Interop
diff --git a/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp b/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp
new file mode 100644
index 0000000000..6ea31f8401
--- /dev/null
+++ b/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp
@@ -0,0 +1,285 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <transact.h>
+#include <xolehlp.h>
+#include <txdtc.h>
+#include <oletx2xa.h>
+#include <iostream>
+#include <fstream>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/FrameSet.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "DtxResourceManager.h"
+#include "XaTransaction.h"
+#include "QpidException.h"
+#include "QpidMarshal.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace msclr;
+
+
+/*
+ * There is one DtxResourceManager per broker and per application process.
+ *
+ * Each RM manages a collection of active XaTransaction objects. Participating AmqpSessions enlist
+ * (or re-enlist) with an XaTransaction indexed by the corresponding System.Transaction object. The
+ * RM maintains its own AmqpSession for sending 2PC commnds (dtxPrepare, dtxCommit etc.). The
+ * XaTransaction object works through the lifecycle of the Transaction, including prompting the
+ * enlisted sessions to send their delimiting dtxEnd commands.
+ *
+ * A separate DtcPlugin.cpp file provides the recovery logic when needed in a library named
+ * qpidxarm.dll. The MSDTC maintans recovery info in its log and tracks when there may be
+ * transactions in doubt. See the documentation for IDtcToXaHelperSinglePipe.
+ *
+ * To enable transaction support:
+ * DTC requires a registry key to find the plugin
+ * [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\MSDTC\XADLL] qpidxarm.dll -> [path to qpidxarm.dll]
+ * DTC needs to be configured for XA
+ * cmdprompt -> dcomcnfg -> Component services -> My Computer -> DTC -> Local DTC -> right click properties -> Security -> Enable XA Transactions
+ *
+ */
+
+// TODO: provide shutdown mechanism, perhaps callback from Connection Idle for enlisted connections.
+// But note that a new RM registration with the DTC is very expensive.
+
+
+DtxResourceManager::DtxResourceManager(AmqpConnection^ appConnection) {
+ dtcComp = NULL;
+ xaHelperp = NULL;
+ rmCookie = 0;
+ doubtCount = 0;
+ tmDown = false;
+ AmqpConnection^ clonedCon = appConnection->Clone();
+ dtxControlSession = clonedCon->CreateSession();
+ dataSourceName = clonedCon->DataSourceName;
+ transactionMap = gcnew Collections::Generic::Dictionary<Transaction^, XaTransaction^>();
+
+ HRESULT hr;
+
+ try {
+ // instead of pinning this instance, just use tmp stack variables for small stuff
+ IUnknown* tmp = NULL;
+ // request the default DTC
+ hr = DtcGetTransactionManager(NULL, NULL, IID_IUnknown, 0, 0, 0, (void **)&tmp);
+ if (hr != S_OK)
+ throw gcnew QpidException("connection failure to DTC service");
+ dtcComp = tmp;
+
+ IDtcToXaHelperSinglePipe *tmp2 = NULL;
+ hr = ((IUnknown *)dtcComp)->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void**) &tmp2);
+ if (hr != S_OK)
+ throw gcnew QpidException("DTC XA unavailable");
+ xaHelperp = tmp2;
+
+ std::string native_dsn = QpidMarshal::ToNative(dataSourceName);
+ DWORD tmp3;
+
+ // This call doesn't return until the DTC has opened and closed a connection to the broker
+ // and written a recovery entry in its log.
+ hr = ((IDtcToXaHelperSinglePipe *) xaHelperp)->XARMCreate(const_cast<char *>(native_dsn.c_str()), "qpidxarm.dll", &tmp3);
+ if (hr != S_OK) {
+ switch (hr) {
+ case E_FAIL:
+ throw gcnew QpidException("Resource Manager DLL configuration error");
+ case E_INVALIDARG:
+ throw gcnew QpidException("Resource Manager internal error");
+ case E_OUTOFMEMORY:
+ throw gcnew QpidException("Resource Manager out of memory");
+ case E_UNEXPECTED:
+ throw gcnew QpidException("Resource Manager internal failure");
+ case XACT_E_TMNOTAVAILABLE:
+ case XACT_E_CONNECTION_DOWN:
+ throw gcnew QpidException("MSDTC unavailable");
+
+ default:
+ throw gcnew QpidException("Resource Manager Registration failed");
+ }
+ }
+
+ rmCookie = tmp3;
+ }
+ finally {
+ if (rmCookie == 0) {
+ // undo partial construction
+ Cleanup();
+ }
+ }
+}
+
+
+DtxResourceManager::!DtxResourceManager() {
+ Cleanup();
+}
+
+
+DtxResourceManager::~DtxResourceManager() {
+ GC::SuppressFinalize(this);
+ Cleanup();
+}
+
+
+// Called when the DTC COM proxy sends TMDOWN to a pending XaTransaction
+// called once for each outstanding tx
+
+void DtxResourceManager::TmDown() {
+ // this block is the only place where both locks are held
+ lock l1(transactionMap);
+ lock l2(resourceManagerMap);
+ if (tmDown)
+ return;
+
+ tmDown = true;
+ resourceManagerMap->Remove(this->dataSourceName);
+ // defer cleanup until last TmDown notification received
+}
+
+
+
+void DtxResourceManager::Cleanup() {
+ for each (Collections::Generic::KeyValuePair<Transaction^, XaTransaction^> kvp in transactionMap) {
+ XaTransaction^ xaTr = kvp.Value;
+ xaTr->ChildFinalize();
+ }
+
+ try {
+ if (rmCookie != 0) {
+ // implies no recovery needed
+ bool cleanSession = (doubtCount == 0) && (transactionMap->Count == 0);
+ ((IDtcToXaHelperSinglePipe *)xaHelperp)->ReleaseRMCookie(rmCookie, cleanSession);
+ rmCookie = 0;
+ }
+
+
+ if (xaHelperp != NULL) {
+ ((IDtcToXaHelperSinglePipe *) xaHelperp)->Release();
+ xaHelperp = NULL;
+ }
+
+ if (dtcComp != NULL) {
+ ((IUnknown *) dtcComp)->Release();
+ dtcComp = NULL;
+ }
+
+ if (dtxControlSession != nullptr) {
+ dtxControlSession->Connection->Close();
+ }
+
+ }
+ catch (Exception^) {}
+}
+
+
+XaTransaction^ DtxResourceManager::GetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) {
+ // find or create the RM instance associated with the session's broker
+ AmqpConnection^ connection = appSession->Connection;
+ DtxResourceManager^ instance = connection->CachedResourceManager;
+
+ // try cached rm first
+ if (instance != nullptr) {
+ XaTransaction^ xaTx = instance->InternalGetXaTransaction(appSession, transaction);
+ if (xaTx != nullptr)
+ return xaTx;
+ else {
+ // cached version no longer available, force new rm creation
+ connection->CachedResourceManager = nullptr;
+ }
+ }
+
+ lock l(resourceManagerMap);
+ String^ dsn = connection->DataSourceName;
+ if (!resourceManagerMap->TryGetValue(dsn, instance)) {
+ instance = gcnew DtxResourceManager(connection->Clone());
+ resourceManagerMap->Add(dsn, instance);
+ connection->CachedResourceManager = instance;
+ }
+ l.release();
+
+ return instance->InternalGetXaTransaction(appSession, transaction);
+}
+
+
+XaTransaction^ DtxResourceManager::InternalGetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) {
+ // find or create the tx proxy instance associated with the DTC transaction
+ lock l(transactionMap);
+ if (tmDown)
+ return nullptr;
+
+ XaTransaction^ xaTransaction = nullptr;
+ if (!transactionMap->TryGetValue(transaction, xaTransaction)) {
+ xaTransaction = gcnew XaTransaction(transaction, (IDtcToXaHelperSinglePipe *) xaHelperp, rmCookie, this);
+ transactionMap->Add(transaction, xaTransaction);
+ }
+
+ return xaTransaction;
+}
+
+void DtxResourceManager::Complete(Transaction ^tx) {
+ lock l(transactionMap);
+ transactionMap->Remove(tx);
+
+ if (tmDown && (transactionMap->Count == 0)) {
+ // no more activity on this instance
+ GC::SuppressFinalize(this);
+ Cleanup();
+ }
+}
+
+
+void DtxResourceManager::IncrementDoubt() {
+ Interlocked::Increment(doubtCount);
+}
+
+
+void DtxResourceManager::DecrementDoubt() {
+ Interlocked::Decrement(doubtCount);
+}
+
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+void DtxResourceManager::ForceRecovery(Transaction ^tx) {
+ lock l(resourceManagerMap);
+ for each (Collections::Generic::KeyValuePair<System::String^, DtxResourceManager^> kvp in resourceManagerMap) {
+
+ Collections::Generic::Dictionary<Transaction^, XaTransaction^>^ txmap = kvp.Value->transactionMap;
+ XaTransaction^ xaTransaction = nullptr;
+ lock l2(txmap);
+ if (txmap->TryGetValue(tx, xaTransaction)) {
+ xaTransaction->ForceRecovery();
+ }
+ }
+}
+#endif
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h b/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h
new file mode 100644
index 0000000000..7df491eec2
--- /dev/null
+++ b/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Transactions;
+
+ref class XaTransaction;
+
+public ref class DtxResourceManager
+{
+private:
+ // Receive() or WaitForMessage()
+ AmqpSession^ dtxControlSession;
+ String^ dataSourceName;
+ bool consumed;
+ DWORD rmCookie;
+ void* xaHelperp;
+ void* dtcComp;
+ int doubtCount;
+ DtxResourceManager(AmqpConnection^);
+ XaTransaction^ InternalGetXaTransaction (AmqpSession^ session, Transaction^ transaction);
+ bool tmDown;
+
+ // The active transactions
+ Collections::Generic::Dictionary<Transaction^, XaTransaction^>^ transactionMap;
+
+ // one resource manager per AMQP broker per process
+ static Collections::Generic::Dictionary<System::String^, DtxResourceManager^>^ resourceManagerMap =
+ gcnew Collections::Generic::Dictionary<System::String^, DtxResourceManager^>();
+
+ void Cleanup();
+ ~DtxResourceManager();
+ !DtxResourceManager();
+
+internal:
+ static XaTransaction^ GetXaTransaction (AmqpSession^ session, Transaction^ transaction);
+ void Complete(Transaction ^tx);
+ void TmDown();
+
+ property AmqpSession^ DtxControlSession {
+ AmqpSession^ get () { return dtxControlSession; }
+ }
+
+ void IncrementDoubt();
+ void DecrementDoubt();
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+public:
+ static void ForceRecovery(Transaction ^tx);
+#endif
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/wcf/src/Apache/Qpid/Interop/InputLink.cpp
index e12151d943..3245cd3540 100644
--- a/wcf/src/Apache/Qpid/Interop/InputLink.cpp
+++ b/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -86,6 +86,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
System::Exception^ linkException = nullptr;
waiters = gcnew Collections::Generic::List<MessageWaiter^>();
+ linkLock = waiters; // private and available
+ subscriptionLock = gcnew Object();
try {
std::string qname = QpidMarshal::ToNative(sourceQueue);
@@ -120,10 +122,13 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
}
}
+// called with lock held
void InputLink::ReleaseNative()
{
// involves talking to the Broker unless the connection is broken
- if (subscriptionp != NULL) {
+
+ if ((subscriptionp != NULL) && !finalizing) {
+ // TODO: find boost time error on cleanup when in finalizer thread
try {
subscriptionp->cancel();
}
@@ -134,20 +139,31 @@ void InputLink::ReleaseNative()
}
// free native mem (or smart pointers) that we own
- if (subscriptionp != NULL)
+ if (subscriptionp != NULL) {
delete subscriptionp;
- if (queuePtrp != NULL)
+ subscriptionp = NULL;
+ }
+ if (queuePtrp != NULL) {
delete queuePtrp;
- if (localQueuep != NULL)
- delete localQueuep;
- if (dequeuedFrameSetpp != NULL)
+ queuePtrp = NULL;
+ }
+ if (localQueuep != NULL) {
+ if (!finalizing) {
+ // TODO: find boost time error on cleanup when in finalizer thread
+ delete localQueuep;
+ localQueuep = NULL;
+ }
+ }
+ if (dequeuedFrameSetpp != NULL) {
delete dequeuedFrameSetpp;
+ dequeuedFrameSetpp = NULL;
+ }
}
void InputLink::Cleanup()
{
{
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return;
@@ -162,6 +178,9 @@ void InputLink::Cleanup()
if (queuePtrp != NULL)
(*queuePtrp)->close();
+ // wait for any sync operations on the subscription to complete before ReleaseNative
+ lock l2(subscriptionLock);
+
try {}
finally
{
@@ -179,6 +198,7 @@ InputLink::~InputLink()
InputLink::!InputLink()
{
+ finalizing = true;
Cleanup();
}
@@ -204,7 +224,7 @@ bool InputLink::haveMessage()
IntPtr InputLink::nextLocalMessage()
{
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return (IntPtr) NULL;
@@ -250,7 +270,7 @@ IntPtr InputLink::nextLocalMessage()
void InputLink::unblockWaiter()
{
// to be followed by resetQueue() below
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return;
(*queuePtrp)->close();
@@ -264,7 +284,7 @@ void InputLink::unblockWaiter()
void InputLink::resetQueue()
{
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return;
if ((*queuePtrp)->isClosed()) {
@@ -282,7 +302,7 @@ bool InputLink::internalWaitForMessage()
bool received = false;
QpidFrameSetPtr* frameSetpp = NULL;
try {
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return false;
if (haveMessage())
@@ -348,7 +368,7 @@ void InputLink::addWaiter(MessageWaiter^ waiter)
void InputLink::removeWaiter(MessageWaiter^ waiter) {
// a waiter can be removed from anywhere in the list if timed out
- lock l(waiters);
+ lock l(linkLock);
int idx = waiters->IndexOf(waiter);
if (idx == -1) {
// TODO: assert or log
@@ -388,7 +408,7 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) {
void InputLink::asyncHelper()
{
- lock l(waiters);
+ lock l(linkLock);
while (true) {
if (disposed && (waiters->Count == 0)) {
@@ -419,14 +439,14 @@ void InputLink::asyncHelper()
void InputLink::sync()
{
- // for the timeout thread
- lock l(waiters);
+ // used by the MessageWaiter timeout thread to not run before fully initialized
+ lock l(linkLock);
}
void InputLink::PrefetchLimit::set(int value)
{
- lock l(waiters);
+ lock l(linkLock);
prefetchLimit = value;
int delta = 0;
@@ -475,31 +495,32 @@ void InputLink::AdjustCredit()
void InputLink::SyncCredit(Object ^unused)
{
- lock l(waiters);
+ lock l(linkLock);
try {
if (disposed)
return;
- Completion comp;
- if (!amqpSession->MessageStop(comp, subscriptionp->getName())) {
+ if (!amqpSession->MessageStop(subscriptionp->getName())) {
// connection closed
return;
}
- // get a private scoped copy to use outside the lock
- Subscription s(*subscriptionp);
-
l.release();
// use setFlowControl to re-enable credit flow on the broker.
- // previously used comp.wait() here, but setFlowControl is a sync operation
- s.setFlowControl(s.getSettings().flowControl);
+ // setFlowControl is a sync operation
+ {
+ lock l2(subscriptionLock);
+ if (subscriptionp != NULL) {
+ subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl);
+ }
+ }
l.acquire();
if (disposed)
return;
- // let existing waiters use up any
+ // let existing waiters use up any messages that arrived.
// local queue size can only decrease until more credit is issued
while (true) {
if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
@@ -700,7 +721,7 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage)
{
- lock l(waiters);
+ lock l(linkLock);
if (waiters->Count == 0) {
// see if there is a message already available without blocking
@@ -740,7 +761,7 @@ IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callba
//TODO: if haveMessage() complete synchronously
- lock l(waiters);
+ lock l(linkLock);
MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state);
addWaiter(waiter);
return waiter;
@@ -779,7 +800,10 @@ bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMess
bool InputLink::WaitForMessage(TimeSpan timeout)
{
- lock l(waiters);
+ lock l(linkLock);
+
+ if (disposed)
+ return false;
if (waiters->Count == 0) {
// see if there is a message already available without blocking
@@ -799,12 +823,12 @@ bool InputLink::WaitForMessage(TimeSpan timeout)
return false;
}
- return true;
+ return haveMessage();
}
IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
{
- lock l(waiters);
+ lock l(linkLock);
// Same as for BeginTryReceive, except consuming = false
MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state);
@@ -822,7 +846,7 @@ bool InputLink::EndWaitForMessage(IAsyncResult^ result)
return false;
}
- return true;
+ return haveMessage();
}
diff --git a/wcf/src/Apache/Qpid/Interop/InputLink.h b/wcf/src/Apache/Qpid/Interop/InputLink.h
index f59a03a8c3..2f96b91944 100644
--- a/wcf/src/Apache/Qpid/Interop/InputLink.h
+++ b/wcf/src/Apache/Qpid/Interop/InputLink.h
@@ -45,6 +45,8 @@ private:
Collections::Generic::List<MessageWaiter^>^ waiters;
bool disposed;
bool finalizing;
+ Object^ linkLock;
+ Object^ subscriptionLock;
QpidFrameSetPtr* dequeuedFrameSetpp;
ManualResetEvent^ asyncHelperWaitHandle;
// number of messages to buffer locally for future consumption
diff --git a/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/wcf/src/Apache/Qpid/Interop/Interop.vcproj
index b662be9d54..c2d6b30fff 100644
--- a/wcf/src/Apache/Qpid/Interop/Interop.vcproj
+++ b/wcf/src/Apache/Qpid/Interop/Interop.vcproj
@@ -83,7 +83,7 @@
/>
<Tool
Name="VCLinkerTool"
- AdditionalOptions="$(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalOptions="$(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib Debug\Apache.Qpid.AmqpTypes.netmodule xolehlp.lib"
AdditionalDependencies="$(NoInherit)"
OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
LinkIncremental="2"
@@ -207,6 +207,11 @@
AssemblyName="System.Runtime.Serialization, Version=3.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
MinFrameworkVersion="196608"
/>
+ <AssemblyReference
+ RelativePath="System.Transactions.dll"
+ AssemblyName="System.Transactions, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=x86"
+ MinFrameworkVersion="131072"
+ />
</References>
<Files>
<Filter
@@ -250,6 +255,14 @@
RelativePath=".\OutputLink.cpp"
>
</File>
+ <File
+ RelativePath=".\DtxResourceManager.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\XaTransaction.cpp"
+ >
+ </File>
</Filter>
<Filter
Name="Header Files"
@@ -296,6 +309,14 @@
RelativePath=".\QpidMarshal.h"
>
</File>
+ <File
+ RelativePath=".\DtxResourceManager.h"
+ >
+ </File>
+ <File
+ RelativePath=".\XaTransaction.h"
+ >
+ </File>
</Filter>
</Files>
<Globals>
diff --git a/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp b/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
new file mode 100644
index 0000000000..23743316ff
--- /dev/null
+++ b/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
@@ -0,0 +1,525 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <transact.h>
+#include <xolehlp.h>
+#include <txdtc.h>
+#include <oletx2xa.h>
+#include <iostream>
+#include <fstream>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/Xid.h"
+
+#include "QpidException.h"
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "DtxResourceManager.h"
+#include "XaTransaction.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace msclr;
+
+using namespace qpid::framing::dtx;
+
+// ------------------------------------------------------------------------
+// Start of a pure native code section
+#pragma unmanaged
+// ------------------------------------------------------------------------
+
+// This is the native COM object the DTC expects to talk to for coordination.
+// There is exactly one native instance of this for each managed XaTransaction object.
+
+
+class DtcCallbackHandler : public ITransactionResourceAsync
+{
+private:
+ long useCount;
+ DtcCallbackFp managedCallback;
+public:
+ ITransactionEnlistmentAsync *txHandle;
+ DtcCallbackHandler(DtcCallbackFp cbp) : managedCallback(cbp), useCount(0) {}
+ ~DtcCallbackHandler() {}
+ virtual HRESULT __stdcall PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase);
+ virtual HRESULT __stdcall CommitRequest(DWORD grfrm, XACTUOW *unused);
+ virtual HRESULT __stdcall AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3);
+
+ virtual HRESULT __stdcall TMDown();
+ virtual HRESULT __stdcall DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject);
+ virtual ULONG __stdcall DtcCallbackHandler::AddRef();
+ virtual ULONG __stdcall DtcCallbackHandler::Release();
+ void __stdcall AbortRequestDone();
+};
+
+
+HRESULT DtcCallbackHandler::PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase)
+{
+ if (singlePhase) {
+ return managedCallback(DTC_SINGLE_PHASE) ? S_OK : E_FAIL;
+ }
+
+ return managedCallback(DTC_PREPARE) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::CommitRequest(DWORD grfrm, XACTUOW *unused)
+{
+ return managedCallback(DTC_COMMIT) ? S_OK : E_FAIL;
+}
+
+HRESULT DtcCallbackHandler::AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3)
+{
+ return managedCallback(DTC_ABORT) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::TMDown()
+{
+ return managedCallback(DTC_TMDOWN) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject)
+{
+ *ppvObject = NULL;
+
+ if ((riid == IID_IUnknown) || (riid == IID_IResourceManagerSink))
+ *ppvObject = this;
+ else
+ return ResultFromScode(E_NOINTERFACE);
+
+ this->AddRef();
+ return S_OK;
+}
+
+
+ULONG DtcCallbackHandler::AddRef()
+{
+ return InterlockedIncrement(&useCount);
+}
+
+
+ULONG DtcCallbackHandler::Release()
+{
+ long uc = InterlockedDecrement(&useCount);
+
+ if (uc)
+ return uc;
+
+ delete this;
+ return 0;
+}
+
+
+// ------------------------------------------------------------------------
+// End of pure native code section
+#pragma managed
+// ------------------------------------------------------------------------
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+void XaTransaction::ForceRecovery() {
+ debugFailMode = true;
+}
+#endif
+
+// ------------------------------------------------------------------------
+// ------------------------------------------------------------------------
+
+
+XaTransaction::XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *xaHelperp, DWORD rmCookie, DtxResourceManager^ rm) {
+ bool success = false;
+ xidp = NULL;
+ commandCompletionp = NULL;
+ firstDtxStartCompletionp = NULL;
+ nativeHandler = NULL;
+ resourceManager = rm;
+ controlSession = rm->DtxControlSession;
+ active = true;
+ preparing = false;
+ systemTransaction = t;
+ IntPtr comTxp = IntPtr::Zero;
+ completionHandle = gcnew ManualResetEvent(false);
+
+ try {
+ enlistedSessions = gcnew Collections::Generic::List<AmqpSession^>();
+
+ // take a System.Transactions.Transaction and obtain
+ // the corresponding DTC COM object.
+ IDtcTransaction^ dtcTransaction = TransactionInterop::GetDtcTransaction(t);
+ comTxp = Marshal::GetIUnknownForObject(dtcTransaction);
+ XID winXid;
+ HRESULT hr = xaHelperp->ConvertTridToXID((DWORD *)comTxp.ToPointer(), rmCookie, &winXid);
+ if (hr != S_OK)
+ throw gcnew QpidException("get XA XID");
+
+ // Convert the X/Open format to the internal Qpid format
+ xidp = new qpid::framing::Xid();
+ xidp->setFormat((uint32_t) winXid.formatID);
+ int bqualPos = 0;
+ if (winXid.gtrid_length > 0) {
+ xidp->setGlobalId(std::string(winXid.data, winXid.gtrid_length));
+ bqualPos = winXid.gtrid_length;
+ }
+ if (winXid.bqual_length > 0) {
+ xidp->setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length));
+ }
+
+ // create the callback chain: DTC proxy -> DtcCallbackHandler -> this
+ inboundDelegate = gcnew DtcCallbackDelegate(this, &XaTransaction::DtcCallback);
+ IntPtr ip = Marshal::GetFunctionPointerForDelegate(inboundDelegate);
+ nativeHandler = new DtcCallbackHandler(static_cast<DtcCallbackFp>(ip.ToPointer()));
+ // add myself for later smart pointer destruction
+ nativeHandler->AddRef();
+
+ hr = xaHelperp->EnlistWithRM(rmCookie, (ITransaction *)comTxp.ToPointer(), nativeHandler, &(nativeHandler->txHandle));
+
+ if (hr != S_OK)
+ throw gcnew QpidException("Enlist");
+
+ success = true;
+ }
+ finally {
+ if (!success)
+ Cleanup();
+ if (comTxp != IntPtr::Zero)
+ ((IUnknown *) comTxp.ToPointer())->Release();
+ }
+}
+
+
+void XaTransaction::Cleanup() {
+ if (firstDtxStartCompletionp != NULL) {
+ try {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ firstDtxStartCompletionp = NULL;
+ }
+
+ if (nativeHandler != NULL) {
+ nativeHandler->Release();
+ nativeHandler = NULL;
+ }
+ if (xidp != NULL) {
+ delete xidp;
+ xidp = NULL;
+ }
+}
+
+
+XaTransaction^ XaTransaction::Enlist (AmqpSession ^session) {
+ lock l(enlistedSessions);
+ if (!active)
+ throw gcnew QpidException("transaction enlistment internal error");
+ if (!enlistedSessions->Contains(session)) {
+ enlistedSessions->Add(session);
+ if (firstEnlistedSession == nullptr) {
+ firstEnlistedSession = session;
+ IntPtr intptr = session->DtxStart((IntPtr) xidp, false, false);
+ firstDtxStartCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ }
+ else {
+ // the broker must see the dtxStart as a join operation, and it must arrive
+ // at the broker after the first dtx start
+ if (firstDtxStartCompletionp != NULL)
+ firstDtxStartCompletionp->wait();
+ session->DtxStart((IntPtr) xidp, true, false);
+ }
+ }
+ else {
+ // already started once, so resume is true
+ session->DtxStart((IntPtr) xidp, false, true);
+ }
+ return this;
+}
+
+
+void XaTransaction::SessionClosing(AmqpSession^ session) {
+ lock l(enlistedSessions);
+ if (!enlistedSessions->Contains(session))
+ return;
+
+ enlistedSessions->Remove(session);
+ if (!active) {
+ // Phase0Flush already done on all sessions
+ l.release();
+ return;
+ }
+
+ IntPtr completion = session->BeginPhase0Flush(this);
+ session->EndPhase0Flush(this, completion);
+
+ if (session == firstEnlistedSession) {
+ // if we just completed the dtxEnd, we know the dtxStart completed before that
+ if (firstDtxStartCompletionp != NULL) {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ firstDtxStartCompletionp = NULL;
+ }
+ }
+}
+
+
+void XaTransaction::Phase0Flush() {
+ // let each session delimit their transactional work with an AMQP dtx.end protocol frame
+ lock l(enlistedSessions);
+ if (!active)
+ return;
+
+ active = false; // no more enlistments
+ int scount = enlistedSessions->Count;
+
+ if (scount > 0) {
+ array<IntPtr> ^completions = gcnew array<IntPtr>(scount);
+ for (int i = 0; i < scount; i++) {
+
+ // TODO: skip phase0 flush for rollback case
+
+ completions[i] = enlistedSessions[i]->BeginPhase0Flush(this);
+ }
+
+ for (int i = 0; i < scount; i++) {
+ // without each session.sync(), session commands are queued up in the right order,
+ // but on their separate outbound channels, and destined for receipt at separate Broker inbound
+ // channels. It is not clear how to be sure Phase 0 dtx.End is processed in the
+ // correct order before commit on the broker without the sync.
+ enlistedSessions[i]->EndPhase0Flush(this, completions[i]);
+ }
+ }
+
+ // since all dtxEnds have completed, we know all starts have too
+ if (firstDtxStartCompletionp != NULL) {
+ try {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ firstDtxStartCompletionp = NULL;
+ }
+}
+
+
+bool XaTransaction::DtcCallback (DtcCallbackType callback) {
+ // called by the DTC proxy thread. Be brief and don't block (but Phase0Flush?)
+
+ if (AppDomain::CurrentDomain->IsFinalizingForUnload())
+ return false;
+
+ IntPtr intptr = IntPtr::Zero;
+ currentCommand = callback;
+
+ try {
+ switch (callback) {
+ case DTC_PREPARE:
+ Phase0Flush();
+ try {
+ intptr = controlSession->DtxPrepare((IntPtr) xidp);
+ preparing = true;
+ resourceManager->IncrementDoubt();
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_COMMIT:
+#ifdef QPID_RECOVERY_TEST_HOOK
+ if (debugFailMode){ return; }
+#endif
+ // no phase 0 required. always preceded by a prepare
+ try {
+ intptr = controlSession->DtxCommit((IntPtr) xidp, false);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_ABORT:
+ Phase0Flush();
+#ifdef QPID_RECOVERY_TEST_HOOK
+ if (debugFailMode){ return; }
+#endif
+ try {
+ intptr = controlSession->DtxRollback((IntPtr) xidp);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_SINGLE_PHASE:
+ Phase0Flush();
+ try {
+ intptr = controlSession->DtxCommit((IntPtr) xidp, true);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_TMDOWN:
+ commandCompletionp = NULL;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+ }
+ return true;
+ }
+ catch (System::Exception^ e) {
+ // TODO: log it
+ Console::WriteLine("Unexpected DtcCallback exception: {0}", e->ToString());
+ }
+ catch (...) {
+ // TODO: log it
+ }
+ return false;
+}
+
+
+// this handles the case where the application regains control for
+// a new transaction before we are notified (abort/rollback
+// optimization in DTC).
+
+void XaTransaction::NotifyPhase0() {
+ if (active)
+ Phase0Flush();
+}
+
+
+void XaTransaction::AsyncCompleter(Object ^unused) {
+ bool success = false;
+
+ if (commandCompletionp != NULL) {
+ try {
+ // waits for the AMQP broker's response and returns the decoded content
+ XaResult& xaResult = commandCompletionp->get();
+ if (xaResult.hasStatus()) {
+ if (xaResult.getStatus() == XaStatus::XA_STATUS_XA_OK) {
+ success = true;
+ }
+ }
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+ try {
+ controlSession->ReleaseCompletion((IntPtr) commandCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ commandCompletionp = NULL;
+ }
+
+ ITransactionEnlistmentAsync *dtcTxHandle = nativeHandler->txHandle;
+
+ HRESULT hr = success ? S_OK : E_FAIL;
+
+ switch (currentCommand) {
+ case DTC_PREPARE:
+ dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
+ break;
+
+ case DTC_COMMIT:
+ dtcTxHandle->CommitRequestDone(hr);
+ if (success)
+ resourceManager->DecrementDoubt();
+ Complete();
+ break;
+
+ case DTC_ABORT:
+ dtcTxHandle->AbortRequestDone(hr);
+ if (success) {
+ if (preparing) {
+ preparing = false;
+ resourceManager->DecrementDoubt();
+ }
+ }
+ Complete();
+ break;
+
+ case DTC_SINGLE_PHASE:
+ if (success)
+ hr = XACT_S_SINGLEPHASE;
+ dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
+ Complete();
+ break;
+
+ case DTC_TMDOWN:
+ // Stop the RM from accepting new enlistments
+ resourceManager->TmDown();
+ Complete();
+ break;
+ }
+}
+
+
+void XaTransaction::Complete() {
+ Cleanup();
+ resourceManager->Complete(systemTransaction);
+ completionHandle->Set();
+}
+
+
+void XaTransaction::WaitForCompletion() {
+ completionHandle->WaitOne();
+}
+
+
+ /*
+void XaTransaction::WaitForFlush() {
+ isFlushedHandle->WaitOne();
+}
+ */
+
+// called from DtxResourceManager Finalize
+
+void XaTransaction::ChildFinalize() {
+ lock l(enlistedSessions);
+ Phase0Flush();
+ Cleanup();
+}
+
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/wcf/src/Apache/Qpid/Interop/XaTransaction.h b/wcf/src/Apache/Qpid/Interop/XaTransaction.h
new file mode 100644
index 0000000000..8ff9f99893
--- /dev/null
+++ b/wcf/src/Apache/Qpid/Interop/XaTransaction.h
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Transactions;
+
+enum DtcCallbackType{
+ DTC_PREPARE,
+ DTC_COMMIT,
+ DTC_ABORT,
+ DTC_SINGLE_PHASE,
+ DTC_TMDOWN
+};
+
+
+ref class DtxResourceManager;
+class DtcCallbackHandler;
+
+// Function pointer declaratiom for managed space delegate
+typedef bool (__stdcall *DtcCallbackFp)(DtcCallbackType);
+
+// and the delegate with the same signature
+public delegate bool DtcCallbackDelegate(DtcCallbackType);
+
+
+
+public ref class XaTransaction
+{
+private:
+ bool active;
+ DtxResourceManager^ resourceManager;
+ Transaction^ systemTransaction;
+ AmqpSession^ controlSession;
+ Collections::Generic::List<AmqpSession^>^ enlistedSessions;
+ qpid::framing::Xid* xidp;
+ DtcCallbackHandler* nativeHandler;
+ bool preparing;
+ DtcCallbackDelegate^ inboundDelegate;
+ // the Qpid async result of the AMQP dtx prepare/commit commands
+ TypedResult<qpid::framing::XaResult>* commandCompletionp;
+ // the Qpid async result of the first session to do dtx start
+ TypedResult<qpid::framing::XaResult>* firstDtxStartCompletionp;
+ ManualResetEvent^ completionHandle;
+
+ AmqpSession^ firstEnlistedSession;
+ DtcCallbackType currentCommand;
+ void AsyncCompleter(Object ^);
+ void Phase0Flush();
+ void Cleanup();
+ void Complete();
+
+internal:
+ XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *pXaHelper, DWORD rmCookie, DtxResourceManager^ rm);
+ XaTransaction^ Enlist (AmqpSession ^session);
+ bool DtcCallback (DtcCallbackType callback);
+ void NotifyPhase0();
+ void ChildFinalize();
+ void SessionClosing(AmqpSession^ session);
+ void WaitForCompletion();
+
+ property IntPtr XidHandle {
+ IntPtr get () { return (IntPtr) xidp; }
+ }
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+ void ForceRecovery();
+ bool debugFailMode;
+#endif
+
+};
+
+}}} // namespace Apache::Qpid::Interop
+