summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java522
1 files changed, 522 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
new file mode 100644
index 0000000000..8a75082202
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -0,0 +1,522 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.dtx.XidImpl;
+import org.apache.qpid.transport.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an implementation of javax.njms.XAResource.
+ */
+public class XAResourceImpl implements XAResource
+{
+ /**
+ * this XAResourceImpl's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
+
+ /**
+ * Reference to the associated XASession
+ */
+ private XASessionImpl _xaSession = null;
+
+ /**
+ * The XID of this resource
+ */
+ private Xid _xid;
+
+ /**
+ * The time for this resource
+ */
+ private int _timeout;
+
+ //--- constructor
+
+ /**
+ * Create an XAResource associated with a XASession
+ *
+ * @param xaSession The session XAresource
+ */
+ protected XAResourceImpl(XASessionImpl xaSession)
+ {
+ _xaSession = xaSession;
+ }
+
+ //--- The XAResource
+ /**
+ * Commits the global transaction specified by xid.
+ *
+ * @param xid A global transaction identifier
+ * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid.
+ * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ,
+ * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void commit(Xid xid, boolean b) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("commit tx branch with xid: ", xid);
+ }
+ Future<XaResult> future =
+ _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
+
+ // now wait on the future for the result
+ XaResult result = null;
+ try
+ {
+ result = future.get();
+ }
+ catch (SessionException e)
+ {
+ // we need to restore the qpid session that has been closed
+ _xaSession.createSession();
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
+ }
+ finally
+ {
+ _xid = null;
+ }
+ checkStatus(result.getStatus());
+ }
+
+ /**
+ * Ends the work performed on behalf of a transaction branch.
+ * The resource manager disassociates the XA resource from the transaction branch specified
+ * and lets the transaction complete.
+ * <ul>
+ * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state.
+ * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified.
+ * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only
+ * <li> If TMSUCCESS is specified, the portion of work has completed successfully.
+ * /ul>
+ *
+ * @param xid A global transaction identifier that is the same as the identifier used previously in the start method
+ * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+ * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR,
+ * XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
+ */
+ public void end(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("end tx branch with xid: ", xid);
+ }
+ switch (flag)
+ {
+ case(XAResource.TMSUCCESS):
+ break;
+ case(XAResource.TMFAIL):
+ break;
+ case(XAResource.TMSUSPEND):
+ break;
+ default:
+ throw new XAException(XAException.XAER_INVAL);
+ }
+ _xaSession.flushAcknowledgments();
+ Future<XaResult> future = _xaSession.getQpidSession()
+ .dtxEnd(convertXid(xid),
+ flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE);
+ // now wait on the future for the result
+ XaResult result = null;
+ try
+ {
+ result = future.get();
+ }
+ catch (SessionException e)
+ {
+ // we need to restore the qpid session that has been closed
+ _xaSession.createSession();
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
+ }
+ checkStatus(result.getStatus());
+ }
+
+
+ /**
+ * Tells the resource manager to forget about a heuristically completed transaction branch.
+ *
+ * @param xid String(xid.getGlobalTransactionId() A global transaction identifier
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL,
+ * XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void forget(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("forget tx branch with xid: ", xid);
+ }
+ _xaSession.getQpidSession().dtxForget(convertXid(xid));
+ try
+ {
+ _xaSession.getQpidSession().sync();
+ }
+ catch (SessionException e)
+ {
+ // we need to restore the qpid session that has been closed
+ _xaSession.createSession();
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
+ }
+ finally
+ {
+ _xid = null;
+ }
+ }
+
+
+ /**
+ * Obtains the current transaction timeout value set for this XAResource instance.
+ * If XAResource.setTransactionTimeout was not used prior to invoking this method,
+ * the return value is the default timeout i.e. 0;
+ *
+ * @return The transaction timeout value in seconds.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+ */
+ public int getTransactionTimeout() throws XAException
+ {
+ return _timeout;
+ }
+
+ /**
+ * This method is called to determine if the resource manager instance represented
+ * by the target object is the same as the resouce manager instance represented by
+ * the parameter xaResource.
+ *
+ * @param xaResource An XAResource object whose resource manager instance is to
+ * be compared with the resource manager instance of the target object
+ * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+ */
+ public boolean isSameRM(XAResource xaResource) throws XAException
+ {
+ // TODO : get the server identity of xaResource and compare it with our own one
+ return false;
+ }
+
+ /**
+ * Prepare for a transaction commit of the transaction specified in <code>Xid</code>.
+ *
+ * @param xid A global transaction identifier.
+ * @return A value indicating the resource manager's vote on the outcome of the transaction.
+ * The possible values are: XA_RDONLY or XA_OK.
+ * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA
+ */
+ public int prepare(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("prepare ", xid);
+ }
+ Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid));
+ XaResult result = null;
+ try
+ {
+ result = future.get();
+ }
+ catch (SessionException e)
+ {
+ // we need to restore the qpid session that has been closed
+ _xaSession.createSession();
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
+ }
+ DtxXaStatus status = result.getStatus();
+ int outcome = XAResource.XA_OK;
+ switch (status)
+ {
+ case XA_OK:
+ break;
+ case XA_RDONLY:
+ outcome = XAResource.XA_RDONLY;
+ break;
+ default:
+ checkStatus(status);
+ }
+ return outcome;
+ }
+
+ /**
+ * Obtains a list of prepared transaction branches.
+ * <p/>
+ * The transaction manager calls this method during recovery to obtain the list of transaction branches
+ * that are currently in prepared or heuristically completed states.
+ *
+ * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
+ * TMNOFLAGS must be used when no other flags are set in the parameter.
+ * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically
+ * completed state.
+ * @throws XAException An error has occurred. Possible value is XAER_INVAL.
+ */
+ public Xid[] recover(int flag) throws XAException
+ {
+ // the flag is ignored
+ Future<RecoverResult> future = _xaSession.getQpidSession().dtxRecover();
+ RecoverResult res = null;
+ try
+ {
+ res = future.get();
+ }
+ catch (SessionException e)
+ {
+ // we need to restore the qpid session that has been closed
+ _xaSession.createSession();
+ convertExecutionErrorToXAErr( e.getException().getErrorCode());
+ }
+ Xid[] result = new Xid[res.getInDoubt().size()];
+ int i = 0;
+ for (Object obj : res.getInDoubt())
+ {
+ org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj;
+ result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId());
+ i++;
+ }
+ return result;
+ }
+
+ /**
+ * Informs the resource manager to roll back work done on behalf of a transaction branch
+ *
+ * @param xid A global transaction identifier.
+ * @throws XAException An error has occurred.
+ */
+ public void rollback(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("rollback tx branch with xid: ", xid);
+ }
+
+ Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid));
+ // now wait on the future for the result
+ XaResult result = null;
+ try
+ {
+ result = future.get();
+ }
+ catch (SessionException e)
+ {
+ // we need to restore the qpid session that has been closed
+ _xaSession.createSession();
+ convertExecutionErrorToXAErr( e.getException().getErrorCode());
+ }
+ finally
+ {
+ _xid = null;
+ }
+ checkStatus(result.getStatus());
+ }
+
+ /**
+ * Sets the current transaction timeout value for this XAResource instance.
+ * Once set, this timeout value is effective until setTransactionTimeout is
+ * invoked again with a different value.
+ * To reset the timeout value to the default value used by the resource manager, set the value to zero.
+ *
+ * @param timeout The transaction timeout value in seconds.
+ * @return true if transaction timeout value is set successfully; otherwise false.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL.
+ */
+ public boolean setTransactionTimeout(int timeout) throws XAException
+ {
+ _timeout = timeout;
+ if (timeout != _timeout && _xid != null)
+ {
+ setDtxTimeout(_timeout);
+ }
+ return true;
+ }
+
+ private void setDtxTimeout(int timeout) throws XAException
+ {
+ _xaSession.getQpidSession()
+ .dtxSetTimeout(XidImpl.convert(_xid), timeout);
+ }
+
+ /**
+ * Starts work on behalf of a transaction branch specified in xid.
+ * <ul>
+ * <li> If TMJOIN is specified, an exception is thrown as it is not supported
+ * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid.
+ * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the
+ * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code.
+ * </ul>
+ *
+ * @param xid A global transaction identifier to be associated with the resource
+ * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
+ * @throws XAException An error has occurred. Possible exceptions
+ * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void start(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("start tx branch with xid: ", xid);
+ }
+ switch (flag)
+ {
+ case(XAResource.TMNOFLAGS):
+ break;
+ case(XAResource.TMJOIN):
+ break;
+ case(XAResource.TMRESUME):
+ break;
+ default:
+ throw new XAException(XAException.XAER_INVAL);
+ }
+ Future<XaResult> future = _xaSession.getQpidSession()
+ .dtxStart(convertXid(xid),
+ flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE);
+ // now wait on the future for the result
+ XaResult result = null;
+ try
+ {
+ result = future.get();
+ }
+ catch (SessionException e)
+ {
+ // we need to restore the qpid session that has been closed
+ _xaSession.createSession();
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
+ // TODO: The amqp spec does not allow to make the difference
+ // between an already known XID and a wrong arguments (join and resume are set)
+ // TODO: make sure amqp addresses that
+ }
+ checkStatus(result.getStatus());
+ _xid = xid;
+ if (_timeout > 0)
+ {
+ setDtxTimeout(_timeout);
+ }
+ }
+
+ //------------------------------------------------------------------------
+ // Private methods
+ //------------------------------------------------------------------------
+
+ /**
+ * Check xa method outcome and, when required, convert the status into the corresponding xa exception
+ * @param status method status code
+ * @throws XAException corresponding XA Exception when required
+ */
+ private void checkStatus(DtxXaStatus status) throws XAException
+ {
+ switch (status)
+ {
+ case XA_OK:
+ // Do nothing this ok
+ break;
+ case XA_RBROLLBACK:
+ // The tx has been rolled back for an unspecified reason.
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case XA_RBTIMEOUT:
+ // The transaction branch took too long.
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ case XA_HEURHAZ:
+ // The transaction branch may have been heuristically completed.
+ throw new XAException(XAException.XA_HEURHAZ);
+ case XA_HEURCOM:
+ // The transaction branch has been heuristically committed.
+ throw new XAException(XAException.XA_HEURCOM);
+ case XA_HEURRB:
+ // The transaction branch has been heuristically rolled back.
+ throw new XAException(XAException.XA_HEURRB);
+ case XA_HEURMIX:
+ // The transaction branch has been heuristically committed and rolled back.
+ throw new XAException(XAException.XA_HEURMIX);
+ case XA_RDONLY:
+ // The transaction branch was read-only and has been committed.
+ throw new XAException(XAException.XA_RDONLY);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ //A resource manager error has occured in the transaction branch.
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
+
+ /**
+ * Convert execution error to xa exception.
+ * @param error the execution error code
+ * @throws XAException
+ */
+ private void convertExecutionErrorToXAErr(ExecutionErrorCode error) throws XAException
+ {
+ switch (error)
+ {
+ case NOT_ALLOWED:
+ // The XID already exists.
+ throw new XAException(XAException.XAER_DUPID);
+ case NOT_FOUND:
+ // The XID is not valid.
+ try
+ {
+ throw new XAException(XAException.XAER_NOTA);
+ }
+ catch (XAException e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ case ILLEGAL_STATE:
+ // Routine was invoked in an inproper context.
+ throw new XAException(XAException.XAER_PROTO);
+ case NOT_IMPLEMENTED:
+ // the command is not implemented
+ throw new XAException(XAException.XAER_RMERR);
+ case COMMAND_INVALID:
+ // Invalid call
+ throw new XAException(XAException.XAER_INVAL);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Got unexpected error: " + error);
+ }
+ //A resource manager error has occured in the transaction branch.
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
+
+ /**
+ * convert a generic xid into qpid format
+ * @param xid xid to be converted
+ * @return the qpid formated xid
+ * @throws XAException when xid is null
+ */
+ private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
+ {
+ if (xid == null)
+ {
+ // Invalid arguments were given.
+ throw new XAException(XAException.XAER_INVAL);
+ }
+ return XidImpl.convert(xid);
+ }
+
+}