summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java98
1 files changed, 70 insertions, 28 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index af9048f1f5..7611c9e8de 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -17,8 +17,13 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.apache.qpid.dtx.XidImpl;
import org.apache.qpid.transport.DtxXaStatus;
@@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.RecoverResult;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.XaResult;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is an implementation of javax.njms.XAResource.
*/
-public class XAResourceImpl implements XAResource
+public class XAResourceImpl implements AMQXAResource
{
/**
* this XAResourceImpl's logger
@@ -57,9 +60,11 @@ public class XAResourceImpl implements XAResource
* The time for this resource
*/
private int _timeout;
-
+
//--- constructor
-
+
+ private List<XAResource> _siblings = new ArrayList<XAResource>();
+
/**
* Create an XAResource associated with a XASession
*
@@ -157,7 +162,20 @@ public class XAResourceImpl implements XAResource
_xaSession.createSession();
convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
+
checkStatus(result.getStatus());
+
+ for(XAResource sibling: _siblings)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings");
+ }
+
+ sibling.end(xid, flag);
+ }
+
+ _siblings.clear();
}
@@ -216,28 +234,38 @@ public class XAResourceImpl implements XAResource
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
*/
public boolean isSameRM(XAResource xaResource) throws XAException
- {
+ {
if(this == xaResource)
{
- return true;
- }
- if(!(xaResource instanceof XAResourceImpl))
+ return true;
+ }
+
+ if(!(xaResource instanceof AMQXAResource))
{
- return false;
+ return false;
}
-
- XAResourceImpl other = (XAResourceImpl)xaResource;
- String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
- String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
-
+ String myUUID = getBrokerUUID();
+ String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID();
+
if(_logger.isDebugEnabled())
{
_logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
}
-
- return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
-
+
+ boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
+
+ if(isSameRm)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. ");
+ }
+ _siblings.add(xaResource);
+ }
+
+ return isSameRm;
+
}
/**
@@ -369,12 +397,12 @@ public class XAResourceImpl implements XAResource
{
_timeout = timeout;
if (timeout != _timeout && _xid != null)
- {
+ {
setDtxTimeout(_timeout);
}
return true;
}
-
+
private void setDtxTimeout(int timeout) throws XAException
{
_xaSession.getQpidSession()
@@ -437,18 +465,23 @@ public class XAResourceImpl implements XAResource
{
setDtxTimeout(_timeout);
}
+
+ for(XAResource sibling: _siblings)
+ {
+ sibling.start(xid, flag);
+ }
}
/**
* Is this resource currently enlisted in a transaction?
- *
+ *
* @return true if the resource is associated with a transaction, false otherwise.
*/
public boolean isEnlisted()
{
return (_xid != null) ;
}
-
+
//------------------------------------------------------------------------
// Private methods
//------------------------------------------------------------------------
@@ -517,7 +550,7 @@ public class XAResourceImpl implements XAResource
}
catch (XAException e)
{
- e.printStackTrace();
+ _logger.error(e.getMessage(), e);
throw e;
}
case ILLEGAL_STATE:
@@ -544,7 +577,7 @@ public class XAResourceImpl implements XAResource
* convert a generic xid into qpid format
* @param xid xid to be converted
* @return the qpid formated xid
- * @throws XAException when xid is null
+ * @throws XAException when xid is null
*/
private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
{
@@ -556,4 +589,13 @@ public class XAResourceImpl implements XAResource
return XidImpl.convert(xid);
}
+ public String getBrokerUUID()
+ {
+ return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
+ }
+
+ public List<XAResource> getSiblings()
+ {
+ return Collections.unmodifiableList(_siblings);
+ }
}