diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java | 98 |
1 files changed, 70 insertions, 28 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index af9048f1f5..7611c9e8de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -17,8 +17,13 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.qpid.dtx.XidImpl; import org.apache.qpid.transport.DtxXaStatus; @@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option; import org.apache.qpid.transport.RecoverResult; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.XaResult; - -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is an implementation of javax.njms.XAResource. */ -public class XAResourceImpl implements XAResource +public class XAResourceImpl implements AMQXAResource { /** * this XAResourceImpl's logger @@ -57,9 +60,11 @@ public class XAResourceImpl implements XAResource * The time for this resource */ private int _timeout; - + //--- constructor - + + private List<XAResource> _siblings = new ArrayList<XAResource>(); + /** * Create an XAResource associated with a XASession * @@ -157,7 +162,20 @@ public class XAResourceImpl implements XAResource _xaSession.createSession(); convertExecutionErrorToXAErr(e.getException().getErrorCode()); } + checkStatus(result.getStatus()); + + for(XAResource sibling: _siblings) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings"); + } + + sibling.end(xid, flag); + } + + _siblings.clear(); } @@ -216,28 +234,38 @@ public class XAResourceImpl implements XAResource * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. */ public boolean isSameRM(XAResource xaResource) throws XAException - { + { if(this == xaResource) { - return true; - } - if(!(xaResource instanceof XAResourceImpl)) + return true; + } + + if(!(xaResource instanceof AMQXAResource)) { - return false; + return false; } - - XAResourceImpl other = (XAResourceImpl)xaResource; - String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); - String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID(); - + String myUUID = getBrokerUUID(); + String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID(); + if(_logger.isDebugEnabled()) { _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID); } - - return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); - + + boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); + + if(isSameRm) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. "); + } + _siblings.add(xaResource); + } + + return isSameRm; + } /** @@ -369,12 +397,12 @@ public class XAResourceImpl implements XAResource { _timeout = timeout; if (timeout != _timeout && _xid != null) - { + { setDtxTimeout(_timeout); } return true; } - + private void setDtxTimeout(int timeout) throws XAException { _xaSession.getQpidSession() @@ -437,18 +465,23 @@ public class XAResourceImpl implements XAResource { setDtxTimeout(_timeout); } + + for(XAResource sibling: _siblings) + { + sibling.start(xid, flag); + } } /** * Is this resource currently enlisted in a transaction? - * + * * @return true if the resource is associated with a transaction, false otherwise. */ public boolean isEnlisted() { return (_xid != null) ; } - + //------------------------------------------------------------------------ // Private methods //------------------------------------------------------------------------ @@ -517,7 +550,7 @@ public class XAResourceImpl implements XAResource } catch (XAException e) { - e.printStackTrace(); + _logger.error(e.getMessage(), e); throw e; } case ILLEGAL_STATE: @@ -544,7 +577,7 @@ public class XAResourceImpl implements XAResource * convert a generic xid into qpid format * @param xid xid to be converted * @return the qpid formated xid - * @throws XAException when xid is null + * @throws XAException when xid is null */ private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException { @@ -556,4 +589,13 @@ public class XAResourceImpl implements XAResource return XidImpl.convert(xid); } + public String getBrokerUUID() + { + return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); + } + + public List<XAResource> getSiblings() + { + return Collections.unmodifiableList(_siblings); + } } |