summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp')
-rw-r--r--qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp794
1 files changed, 794 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
new file mode 100644
index 0000000000..33d125e3c6
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp
@@ -0,0 +1,794 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+
+//
+// This module provides the backend recovery driver for Windows resource managers based on
+// the IDtcToXaHelperSinglePipe interface. The dll is loaded (LoadLibrary) directly into DTC
+// itself and runs at a different protection level from the resource manager instance, which
+// runs inside the application.
+//
+// The DTC dynamically loads this file, calls GetXaSwitch() to access the XA interface
+// implementation and unloads the dll when done.
+//
+// This DTC plugin is only called for registration and recovery. Each time the application
+// registers the Qpid resource manager with DTC, the plugin is loaded and a successful
+// connection via xa_open is confirmed before completing registration and saving the DSN
+// connection string in the DTC log for possible recovery. On recovery, the DSN is re-used to
+// re-establish a new connection with the broker and perform recovery.
+//
+// Because this plugin is not involved in coordinating any active transactions it only needs to
+// partially implement the XA interface.
+//
+// For the same reason, the locking strategy is simple. A single global lock is used.
+// Whenever networking activity is about to take place, the lock is relinquished and retaken
+// soon thereafter.
+
+
+#include <windows.h>
+#include <transact.h>
+#include <xolehlp.h>
+#include <txdtc.h>
+#include <xa.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Connection.h"
+
+
+#include <map>
+#include <iostream>
+#include <fstream>
+
+namespace Apache {
+namespace Qpid {
+namespace DtcPlugin {
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::framing::dtx;
+
+class ResourceManager
+{
+private:
+ Connection qpidConnection;
+ Session qpidSession;
+ bool active;
+ std::string host;
+ int port;
+ std::string username;
+ std::string password;
+ bool ssl;
+ bool saslPlain;
+
+ int rmid;
+ std::vector<qpid::framing::Xid> inDoubtXids;
+ // current scan position, or -1 if no scan
+ int cursor;
+public:
+ ResourceManager(int id, std::string h, int p, bool sslP, bool saslPlainP, std::string uname, std::string pass)
+ : rmid(id), host(h), port(p), ssl(sslP), saslPlain(saslPlainP), username(uname), password(pass),
+ active(false), cursor(-1) {}
+ ~ResourceManager() {}
+ INT open();
+ INT close();
+ INT commit(XID *xid);
+ INT rollback(XID *xid);
+ INT recover(XID *xids, long count, long flags);
+};
+
+
+CRITICAL_SECTION rmLock;
+
+std::map<int, ResourceManager*> rmMap;
+HMODULE thisDll = NULL;
+bool memLocked = false;
+
+#define QPIDHMCHARS 512
+
+
+void pinDll() {
+ if (!memLocked) {
+ char thisDllName[QPIDHMCHARS];
+ HMODULE ignore;
+
+ DWORD nc = GetModuleFileName(thisDll, thisDllName, QPIDHMCHARS);
+ if ((nc > 0) && (nc < QPIDHMCHARS)) {
+ memLocked = GetModuleHandleEx(GET_MODULE_HANDLE_EX_FLAG_PIN, thisDllName, &ignore);
+ }
+ }
+}
+
+
+void XaToQpid(XID &winXid, Xid &qpidXid) {
+ // convert from XA defined structure XID to the Qpid framing structure
+ qpidXid.setFormat((uint32_t) winXid.formatID);
+ int bqualPos = 0;
+ if (winXid.gtrid_length > 0) {
+ qpidXid.setGlobalId(std::string(winXid.data, winXid.gtrid_length));
+ bqualPos = winXid.gtrid_length;
+ }
+ if (winXid.bqual_length > 0) {
+ qpidXid.setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length));
+ }
+}
+
+
+// this function assumes that the qpidXid has already been validated for the memory copy
+
+void QpidToXa(Xid &qpidXid, XID &winXid) {
+ // convert from the Qpid framing structure to the XA defined structure XID
+ winXid.formatID = qpidXid.getFormat();
+
+ const std::string& global_s = qpidXid.getGlobalId();
+ size_t gl = global_s.size();
+ winXid.gtrid_length = (long) gl;
+ if (gl > 0)
+ global_s.copy(winXid.data, gl);
+
+ const std::string branch_s = qpidXid.getBranchId();
+ size_t bl = branch_s.size();
+ winXid.bqual_length = (long) bl;
+ if (bl > 0)
+ branch_s.copy(winXid.data + gl, bl);
+}
+
+
+static char *dsnHeader = "QPIDdsnV2";
+
+const char* nextDot(const char *p) {
+ while (*p && (*p != '.'))
+ p++;
+ return p;
+}
+
+int getHexChar (char c) {
+ if ((c >= '0') && (c <= '9'))
+ return c - '0';
+
+ if ((c >= 'a') && (c <= 'f'))
+ return 10 + (c - 'a');
+
+ if ((c >= 'A') && (c <= 'F'))
+ return 10 + (c - 'A');
+
+ return -1;
+}
+
+bool parseFromHex(const char* start, const char* end, std::string& target)
+{
+ const char *p = start;
+
+ while ((p + 1) < end) {
+ int nibble = getHexChar(*p++);
+ if (nibble < 0)
+ return false;
+ int byte = (nibble << 4);
+ nibble = getHexChar(*p++);
+ if (nibble < 0)
+ return false;
+ byte += nibble;
+ target.append (1, (char) byte & 0xFF);
+ }
+ return (p == end);
+}
+
+
+// parse string from AmqpConnection::DataSourcename
+// "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password"
+//
+// parse strictly and return false if the dsn is in a bad format
+
+bool parseDsn (const char *dsn, std::string& host, int& port, bool& ssl, bool& saslPlain,
+ std::string& username, std::string& password) {
+ if (dsn == NULL)
+ return false;
+
+ size_t len = strnlen(dsn, 1025);
+ if (len > 1024)
+ return false;
+
+ if (strncmp(dsn, dsnHeader, strlen(dsnHeader)))
+ return false;
+
+ const char *endp = dsn + len;
+ const char *tokenp = dsn + strlen(dsnHeader);
+ if (*tokenp != '.')
+ return false;
+
+ // port
+ tokenp++;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == '.')
+ return false; // null port not allowed
+
+ const char *token_end = nextDot(tokenp);
+ if ((token_end - tokenp) > 5)
+ return false;
+
+ port = 0;
+ for (const char *p = tokenp; p < token_end; p++) {
+ if ((*p < '0') || (*p > '9'))
+ return false;
+ port = (10 * port) + (*p - '0');
+ }
+
+ if (port > 65535)
+ return false;
+
+ // host
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == '.')
+ return false; // null host not allowed
+
+ token_end = nextDot(tokenp);
+ if (!parseFromHex(tokenp, token_end, host))
+ return false;
+
+ // skip the RM identifier, but verify it exists
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ token_end = nextDot (tokenp);
+ if ((token_end - tokenp) < 3)
+ return false;
+
+ // ssl: look for T or F
+ tokenp = token_end + 1;
+ if (tokenp >= endp)
+ return false;
+ if (*tokenp == 'T')
+ ssl = true;
+ else if (*tokenp == 'F')
+ ssl = false;
+ else
+ return false;
+ if (*++tokenp != '.')
+ return false;
+
+ // sasl mechanism: A = anonymous, P = plain. More to come...
+ ++tokenp;
+ if (tokenp >= endp)
+ return false;
+ if (*(tokenp+1) != '.')
+ return false;
+
+ if (*tokenp == 'A') {
+ saslPlain = false;
+ tokenp += 2;
+ // no auth tokens
+ }
+ else if (*tokenp == 'P') {
+ saslPlain = true;
+ tokenp += 2;
+ if (tokenp >= endp)
+ return false;
+ token_end = nextDot (tokenp);
+ if (!parseFromHex(tokenp, token_end, username))
+ return false;
+ tokenp = token_end + 1;
+
+ if (tokenp >= endp)
+ return false;
+ token_end = nextDot (tokenp);
+ if (!parseFromHex(tokenp, token_end, password))
+ return false;
+ tokenp = token_end + 1;
+ }
+ else
+ return false;
+
+ return (tokenp == endp);
+}
+
+
+
+INT ResourceManager::open() {
+ INT rv = XAER_RMERR; // placeholder until we successfully connect to resource
+ active = true;
+ LeaveCriticalSection(&rmLock);
+
+ try {
+ ConnectionSettings settings;
+ settings.host = this->host;
+ settings.port = this->port;
+
+
+ if (ssl)
+ settings.protocol = "ssl";
+
+ if (saslPlain) {
+ settings.username = this->username;
+ settings.password = this->password;
+ settings.mechanism = "PLAIN";
+ }
+
+ qpidConnection.open(settings);
+ qpidSession = qpidConnection.newSession();
+ rv = XA_OK;
+/*
+TODO: logging
+ } catch (const qpid::Exception& error) {
+ // log it
+ } catch (const std::exception& e2) {
+ // log it
+*/
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+ return rv;
+}
+
+
+INT ResourceManager::close() {
+ // should never be called when already sending other commands to broker
+ if (active)
+ return XAER_PROTO;
+
+ INT rv = XAER_RMERR; // placeholder until we successfully close resource
+ active = true;
+ LeaveCriticalSection(&rmLock);
+ try {
+ if (qpidSession.isValid()) {
+ qpidSession.close();
+ }
+ if (qpidConnection.isOpen()) {
+ qpidConnection.close();
+ }
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+
+ if (!qpidConnection.isOpen()) {
+ rv = XA_OK;
+ }
+ return rv;
+}
+
+
+INT ResourceManager::commit(XID *xid) {
+ if (active)
+ return XAER_PROTO;
+
+ INT rv = XAER_RMFAIL;
+ active = true;
+ LeaveCriticalSection(&rmLock);
+
+ try {
+ qpid::framing::Xid qpidXid;
+ XaToQpid(*xid, qpidXid);
+
+ XaResult xaResult = qpidSession.dtxCommit(qpidXid, false, true);
+ if (xaResult.hasStatus()) {
+ uint16_t status = xaResult.getStatus();
+ switch ((XaStatus) status) {
+ case XA_STATUS_XA_OK:
+ case XA_STATUS_XA_RDONLY:
+ case XA_STATUS_XA_HEURCOM:
+ rv = XA_OK;
+ break;
+
+ default:
+ // commit failed and a retry won't fix
+ rv = XAER_RMERR;
+ break;
+ }
+
+ }
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+ return rv;
+}
+
+
+INT ResourceManager::rollback(XID *xid) {
+ if (active)
+ return XAER_PROTO;
+
+ INT rv = XAER_RMFAIL;
+ active = true;
+ LeaveCriticalSection(&rmLock);
+
+ try {
+ qpid::framing::Xid qpidXid;
+ XaToQpid(*xid, qpidXid);
+
+ XaResult xaResult = qpidSession.dtxRollback(qpidXid, true);
+ if (xaResult.hasStatus()) {
+ uint16_t status = xaResult.getStatus();
+ switch ((XaStatus) status) {
+ case XA_STATUS_XA_OK:
+ case XA_STATUS_XA_HEURRB:
+ rv = XA_OK;
+ break;
+
+ default:
+ // RM internal error
+ rv = XA_RBPROTO;
+ break;
+ }
+ }
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+ return rv;
+}
+
+
+INT ResourceManager::recover(XID *xids, long count, long flags) {
+ if (active)
+ return XAER_PROTO;
+
+ if ((xids == NULL) && (count != 0))
+ return XAER_INVAL;
+
+ if (count < 0)
+ return XAER_INVAL;
+
+ if (!(flags & TMSTARTRSCAN) && (cursor == -1))
+ // no existing scan and no scan requested
+ return XAER_INVAL;
+
+ INT status = XA_OK;
+
+ if (flags & TMSTARTRSCAN) {
+ // start a fresh scan
+ cursor = -1;
+ inDoubtXids.clear();
+ active = true;
+ LeaveCriticalSection(&rmLock);
+
+ try {
+ // status if we can't talk to the broker
+ status = XAER_RMFAIL;
+ std::vector<std::string> wireFormatXids;
+
+ DtxRecoverResult dtxrr = qpidSession.dtxRecover(true);
+
+ // status if we can't process the xids
+ status = XAER_RMERR;
+ dtxrr.getInDoubt().collect(wireFormatXids);
+ size_t nXids = wireFormatXids.size();
+
+ if (nXids > 0) {
+ StructHelper decoder;
+ Xid qpidXid;
+ for (size_t i = 0; i < nXids; i++) {
+ decoder.decode (qpidXid, wireFormatXids[i]);
+ inDoubtXids.push_back(qpidXid);
+ }
+
+ // if we got here the decoder validated the Xids
+ status = XA_OK;
+
+ // make sure none are too big, just in case
+
+ for (size_t i = 0; i < nXids; i++) {
+ Xid& xid = inDoubtXids[i];
+ size_t l1 = xid.hasGlobalId() ? xid.getGlobalId().size() : 0;
+ size_t l2 = xid.hasBranchId() ? xid.getBranchId().size() : 0;
+ if ((l1 > MAXGTRIDSIZE) || (l2 > MAXBQUALSIZE) ||
+ ((l1 + l2) > XIDDATASIZE)) {
+ status = XAER_RMERR;
+ break;
+ }
+ }
+ }
+ else {
+ // nXids == 0, the previously cleared inDoubtXids is correctly populated
+ status = XA_OK;
+ }
+
+ if (status == XA_OK)
+ cursor = 0;
+ } catch (...) {
+ // TODO: log it
+ }
+
+ EnterCriticalSection(&rmLock);
+ active = false;
+ }
+ else {
+ // TMSTARTRSCAN not set, is there an existing scan to work from?
+ if (cursor == -1)
+ return XAER_INVAL;
+ }
+
+ if (status != XA_OK)
+ return status;
+
+ INT actualCount = count;
+ if (count > 0) {
+ int nAvailable = (int) inDoubtXids.size() - cursor;
+ if (nAvailable < count)
+ actualCount = nAvailable;
+
+ for (int i = 0; i < actualCount; i++) {
+ Xid& qpidXid = inDoubtXids[i + cursor];
+ QpidToXa(qpidXid, xids[i]);
+ }
+ }
+
+ if (flags & TMENDRSCAN) {
+ cursor = -1;
+ inDoubtXids.clear();
+ }
+
+ return actualCount;
+}
+
+
+// Call with lock held
+
+ResourceManager* findRm(int rmid) {
+ if (rmMap.find(rmid) == rmMap.end()) {
+ return NULL;
+ }
+ return rmMap[rmid];
+}
+
+
+INT __cdecl xa_open (char *xa_info, int rmid, long flags) {
+ if (flags & TMASYNC)
+ return XAER_ASYNC;
+
+ INT rv = XAER_RMERR;
+ EnterCriticalSection(&rmLock);
+
+ ResourceManager* rmp = findRm(rmid);
+ if (rmp != NULL) {
+ // error: already in use
+ rv = XAER_PROTO;
+ }
+ else {
+ std::string brokerHost;
+ int brokerPort;
+ std::string username;
+ std::string password;
+ bool ssl;
+ bool saslPlain;
+
+ if (parseDsn(xa_info, brokerHost, brokerPort, ssl, saslPlain, username, password)) {
+
+ try {
+ rmp = new ResourceManager(rmid, brokerHost, brokerPort, ssl, saslPlain, username, password);
+
+ rv = rmp->open();
+ if (rv != XA_OK) {
+ delete (rmp);
+ }
+ else {
+ rmMap[rmid] = rmp;
+ }
+ } catch (...) {}
+ }
+ else {
+ rv = XAER_INVAL;
+ }
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_close (char *xa_info, int rmid, long flags) {
+ if (flags & TMASYNC)
+ return XAER_ASYNC;
+
+ INT rv = XAER_RMERR;
+
+ EnterCriticalSection(&rmLock);
+ ResourceManager* rmp = findRm(rmid);
+
+ if (rmp == NULL) {
+ // can close multiple times
+ rv = XA_OK;
+ }
+ else {
+ rv = rmp->close();
+ rmMap.erase(rmid);
+ try {
+ delete (rmp);
+ } catch (...) {
+ // TODO: log it
+ }
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_commit (XID *xid, int rmid, long flags) {
+ if (flags & TMASYNC)
+ return XAER_ASYNC;
+
+ INT rv = XAER_RMFAIL;
+
+ EnterCriticalSection(&rmLock);
+ ResourceManager* rmp = findRm(rmid);
+
+ if (rmp == NULL) {
+ rv = XAER_INVAL;
+ }
+ else {
+ rv = rmp->commit(xid);
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_rollback (XID *xid, int rmid, long flags) {
+ if (flags & TMASYNC)
+ return XAER_ASYNC;
+
+ INT rv = XAER_RMFAIL;
+
+ EnterCriticalSection(&rmLock);
+ ResourceManager* rmp = findRm(rmid);
+
+ if (rmp == NULL) {
+ rv = XAER_INVAL;
+ }
+ else {
+ rv = rmp->rollback(xid);
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_recover (XID *xids, long count, int rmid, long flags) {
+ INT rv = XAER_RMFAIL;
+
+ EnterCriticalSection(&rmLock);
+ ResourceManager* rmp = findRm(rmid);
+
+ if (rmp == NULL) {
+ rv = XAER_PROTO;
+ }
+ else {
+ rv = rmp->recover(xids, count, flags);
+ }
+
+ LeaveCriticalSection(&rmLock);
+ return rv;
+}
+
+
+INT __cdecl xa_start (XID *xid, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+INT __cdecl xa_end (XID *xid, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+INT __cdecl xa_prepare (XID *xid, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+INT __cdecl xa_forget (XID *xid, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+INT __cdecl xa_complete (int *handle, int *retval, int rmid, long flags) {
+ // not used in recovery
+ return XAER_PROTO;
+}
+
+
+
+xa_switch_t xaSwitch;
+
+HRESULT __cdecl GetQpidXaSwitch (DWORD XaSwitchFlags, xa_switch_t ** ppXaSwitch)
+{
+ // needed for now due to implicit use of FreeLibrary in WSACleanup() in qpid/cpp/src/qpid/sys/windows/Socket.cpp
+ pinDll();
+
+ if (xaSwitch.xa_open_entry != xa_open) {
+
+ xaSwitch.xa_open_entry = xa_open;
+ xaSwitch.xa_close_entry = xa_close;
+ xaSwitch.xa_start_entry = xa_start;
+ xaSwitch.xa_end_entry = xa_end;
+ xaSwitch.xa_prepare_entry = xa_prepare;
+ xaSwitch.xa_commit_entry = xa_commit;
+ xaSwitch.xa_rollback_entry = xa_rollback;
+ xaSwitch.xa_recover_entry = xa_recover;
+ xaSwitch.xa_forget_entry = xa_forget;
+ xaSwitch.xa_complete_entry = xa_complete;
+
+ strcpy_s(xaSwitch.name, RMNAMESZ, "qpidxarm");
+ xaSwitch.flags = TMNOMIGRATE;
+ xaSwitch.version = 0;
+ }
+ *ppXaSwitch = &xaSwitch;
+ return S_OK;
+}
+
+
+
+
+}}} // namespace Apache::Qpid::DtcPlugin
+
+
+// GetXaSwitch
+
+extern "C" {
+
+ __declspec(dllexport) HRESULT __cdecl GetXaSwitch (DWORD XaSwitchFlags, xa_switch_t ** ppXaSwitch)
+ {
+ return Apache::Qpid::DtcPlugin::GetQpidXaSwitch (XaSwitchFlags, ppXaSwitch);
+ }
+}
+
+
+// dllmain
+
+BOOL APIENTRY DllMain( HMODULE hModule,
+ DWORD ul_reason_for_call,
+ LPVOID lpReserved)
+{
+
+ switch (ul_reason_for_call)
+ {
+ case DLL_PROCESS_ATTACH:
+ InitializeCriticalSection(&Apache::Qpid::DtcPlugin::rmLock);
+ Apache::Qpid::DtcPlugin::thisDll = hModule;
+ break;
+
+ case DLL_PROCESS_DETACH:
+ DeleteCriticalSection(&Apache::Qpid::DtcPlugin::rmLock);
+ break;
+
+ case DLL_THREAD_ATTACH:
+ case DLL_THREAD_DETACH:
+ break;
+ }
+ return TRUE;
+}
+