summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWeston M. Price <wprice@apache.org>2012-05-14 19:30:20 +0000
committerWeston M. Price <wprice@apache.org>2012-05-14 19:30:20 +0000
commitfa7baeaf72635628b9d2ea2ad60ba782d6313044 (patch)
tree4a6514c18814b7380463d340797e36bfca36be85
parent6bdb9fe2b2955a8ff5665ad2908136348c678383 (diff)
downloadqpid-python-fa7baeaf72635628b9d2ea2ad60ba782d6313044.tar.gz
QPID-3990: Multiple XAResources isSameRM behavior
*Track XAResource siblings in start/end methods *Added AMQXAResource interface *Added systemtest for new XAResource behavior *Refactored XAResourceTest to extend AbstractXATest git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338355 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java98
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java17
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java73
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java67
-rw-r--r--qpid/java/test-profiles/JavaPre010Excludes1
6 files changed, 247 insertions, 41 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java
new file mode 100644
index 0000000000..cce6b91781
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java
@@ -0,0 +1,32 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.List;
+
+import javax.transaction.xa.XAResource;
+
+public interface AMQXAResource extends XAResource
+{
+ public String getBrokerUUID();
+
+ public List<XAResource> getSiblings();
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index af9048f1f5..7611c9e8de 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -17,8 +17,13 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.apache.qpid.dtx.XidImpl;
import org.apache.qpid.transport.DtxXaStatus;
@@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.RecoverResult;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.XaResult;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is an implementation of javax.njms.XAResource.
*/
-public class XAResourceImpl implements XAResource
+public class XAResourceImpl implements AMQXAResource
{
/**
* this XAResourceImpl's logger
@@ -57,9 +60,11 @@ public class XAResourceImpl implements XAResource
* The time for this resource
*/
private int _timeout;
-
+
//--- constructor
-
+
+ private List<XAResource> _siblings = new ArrayList<XAResource>();
+
/**
* Create an XAResource associated with a XASession
*
@@ -157,7 +162,20 @@ public class XAResourceImpl implements XAResource
_xaSession.createSession();
convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
+
checkStatus(result.getStatus());
+
+ for(XAResource sibling: _siblings)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings");
+ }
+
+ sibling.end(xid, flag);
+ }
+
+ _siblings.clear();
}
@@ -216,28 +234,38 @@ public class XAResourceImpl implements XAResource
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
*/
public boolean isSameRM(XAResource xaResource) throws XAException
- {
+ {
if(this == xaResource)
{
- return true;
- }
- if(!(xaResource instanceof XAResourceImpl))
+ return true;
+ }
+
+ if(!(xaResource instanceof AMQXAResource))
{
- return false;
+ return false;
}
-
- XAResourceImpl other = (XAResourceImpl)xaResource;
- String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
- String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
-
+ String myUUID = getBrokerUUID();
+ String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID();
+
if(_logger.isDebugEnabled())
{
_logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
}
-
- return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
-
+
+ boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
+
+ if(isSameRm)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. ");
+ }
+ _siblings.add(xaResource);
+ }
+
+ return isSameRm;
+
}
/**
@@ -369,12 +397,12 @@ public class XAResourceImpl implements XAResource
{
_timeout = timeout;
if (timeout != _timeout && _xid != null)
- {
+ {
setDtxTimeout(_timeout);
}
return true;
}
-
+
private void setDtxTimeout(int timeout) throws XAException
{
_xaSession.getQpidSession()
@@ -437,18 +465,23 @@ public class XAResourceImpl implements XAResource
{
setDtxTimeout(_timeout);
}
+
+ for(XAResource sibling: _siblings)
+ {
+ sibling.start(xid, flag);
+ }
}
/**
* Is this resource currently enlisted in a transaction?
- *
+ *
* @return true if the resource is associated with a transaction, false otherwise.
*/
public boolean isEnlisted()
{
return (_xid != null) ;
}
-
+
//------------------------------------------------------------------------
// Private methods
//------------------------------------------------------------------------
@@ -517,7 +550,7 @@ public class XAResourceImpl implements XAResource
}
catch (XAException e)
{
- e.printStackTrace();
+ _logger.error(e.getMessage(), e);
throw e;
}
case ILLEGAL_STATE:
@@ -544,7 +577,7 @@ public class XAResourceImpl implements XAResource
* convert a generic xid into qpid format
* @param xid xid to be converted
* @return the qpid formated xid
- * @throws XAException when xid is null
+ * @throws XAException when xid is null
*/
private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
{
@@ -556,4 +589,13 @@ public class XAResourceImpl implements XAResource
return XidImpl.convert(xid);
}
+ public String getBrokerUUID()
+ {
+ return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
+ }
+
+ public List<XAResource> getSiblings()
+ {
+ return Collections.unmodifiableList(_siblings);
+ }
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java
index 22b39792b1..37ae7f5514 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java
@@ -21,10 +21,13 @@
package org.apache.qpid.ra;
+import java.util.List;
+
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.apache.qpid.client.AMQXAResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +35,7 @@ import org.slf4j.LoggerFactory;
* QpidRAXAResource.
*
*/
-public class QpidRAXAResource implements XAResource
+public class QpidRAXAResource implements AMQXAResource
{
/** The logger */
private static final Logger _log = LoggerFactory.getLogger(QpidRAXAResource.class);
@@ -192,7 +195,7 @@ public class QpidRAXAResource implements XAResource
{
_log.trace("isSameRM(" + xaRes + ")");
}
-
+
return _xaResource.isSameRM(xaRes);
}
@@ -242,4 +245,14 @@ public class QpidRAXAResource implements XAResource
return _xaResource.setTransactionTimeout(seconds);
}
+
+ public String getBrokerUUID()
+ {
+ return ((AMQXAResource)_xaResource).getBrokerUUID();
+ }
+
+ public List<XAResource> getSiblings()
+ {
+ return ((AMQXAResource)_xaResource).getSiblings();
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
index ac29b72620..e18f70b01d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
@@ -24,21 +24,72 @@ import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.test.unit.xa.AbstractXATestCase;
+import org.apache.qpid.client.AMQXAResource;
+
+import org.apache.qpid.dtx.XidImpl;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
+import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
-public class XAResourceTest extends QpidBrokerTestCase
+public class XAResourceTest extends AbstractXATestCase
{
private static final String FACTORY_NAME = "default";
private static final String ALT_FACTORY_NAME = "connection2";
+ public void init() throws Exception
+ {
+ }
+
+ public void testIsSameRMJoin() throws Exception
+ {
+ XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME);
+ XAConnection conn1 = factory.createXAConnection("guest", "guest");
+ XAConnection conn2 = factory.createXAConnection("guest", "guest");
+ XAConnection conn3 = factory.createXAConnection("guest", "guest");
+
+ XASession session1 = conn1.createXASession();
+ XASession session2 = conn2.createXASession();
+ XASession session3 = conn3.createXASession();
+
+ AMQXAResource xaResource1 = (AMQXAResource)session1.getXAResource();
+ AMQXAResource xaResource2 = (AMQXAResource)session2.getXAResource();
+ AMQXAResource xaResource3 = (AMQXAResource)session3.getXAResource();
+
+ Xid xid = getNewXid();
+
+ xaResource1.start(xid, XAResource.TMNOFLAGS);
+ assertTrue("XAResource isSameRM", xaResource1.isSameRM(xaResource2));
+ xaResource2.start(xid, XAResource.TMJOIN);
+ assertTrue("AMQXAResource siblings should be 1", xaResource1.getSiblings().size() == 1);
+
+ assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource2.getSiblings().size() == 0);
+
+ assertTrue("XAResource isSameRM", xaResource2.isSameRM(xaResource3));
+
+
+ xaResource3.start(xid, XAResource.TMJOIN);
+ assertTrue("AMQXAResource siblings should be 1", xaResource2.getSiblings().size() == 1);
+
+ xaResource1.end(xid, XAResource.TMSUCCESS);
+ assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource1.getSiblings().size() == 0);
+
+ xaResource1.prepare(xid);
+ xaResource1.commit(xid, false);
+
+ conn3.close();
+ conn2.close();
+ conn1.close();
+ }
+
/*
* Test with multiple XAResources originating from the same connection factory. XAResource(s) will be equal,
- * as they originate from the same session.
+ * as they originate from the same session.
*/
public void testIsSameRMSingleCF() throws Exception
{
@@ -47,14 +98,14 @@ public class XAResourceTest extends QpidBrokerTestCase
XASession session = conn.createXASession();
XAResource xaResource1 = session.getXAResource();
XAResource xaResource2 = session.getXAResource();
-
+
assertEquals("XAResource objects not equal", xaResource1, xaResource2);
assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2));
-
+
session.close();
conn.close();
}
-
+
/*
* Test with multiple XAResources originating from different connection factory's and different sessions. XAResources will not be
* equal as they do not originate from the same session. As the UUID from the broker will be the same, isSameRM will be true.
@@ -67,11 +118,11 @@ public class XAResourceTest extends QpidBrokerTestCase
XAConnectionFactory factory = new AMQConnectionFactory(url);
XAConnectionFactory factory2 = new AMQConnectionFactory(url);
XAConnectionFactory factory3 = getConnectionFactory(ALT_FACTORY_NAME);
-
+
XAConnection conn = factory.createXAConnection("guest","guest");
XAConnection conn2 = factory2.createXAConnection("guest","guest");
XAConnection conn3 = factory3.createXAConnection("guest","guest");
-
+
XASession session = conn.createXASession();
XASession session2 = conn2.createXASession();
XASession session3 = conn3.createXASession();
@@ -79,14 +130,14 @@ public class XAResourceTest extends QpidBrokerTestCase
XAResource xaResource1 = session.getXAResource();
XAResource xaResource2 = session2.getXAResource();
XAResource xaResource3 = session3.getXAResource();
-
+
assertFalse("XAResource objects should not be equal", xaResource1.equals(xaResource2));
assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2));
assertFalse("isSameRM true for XA Resources created by two different brokers", xaResource1.isSameRM(xaResource3));
-
+
conn.close();
conn2.close();
- conn3.close();
+ conn3.close();
}
@Override
@@ -103,5 +154,5 @@ public class XAResourceTest extends QpidBrokerTestCase
FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
}
}
-
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java
new file mode 100644
index 0000000000..c8116d8cef
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+
+import org.apache.qpid.client.AMQXAResource;
+
+import org.apache.qpid.ra.QpidRAConnectionFactoryImpl;
+import org.apache.qpid.ra.QpidRAManagedConnectionFactory;
+import org.apache.qpid.ra.QpidResourceAdapter;
+
+public class QpidRAXAResourceTest extends QpidBrokerTestCase
+{
+ private static final String FACTORY_NAME = "default";
+ private static final String BROKER_PORT = "15672";
+ private static final String URL = "amqp://guest:guest@client/test?brokerlist='tcp://localhost:" + BROKER_PORT + "?sasl_mechs='PLAIN''";
+
+ public void testXAResourceIsSameRM() throws Exception
+ {
+ QpidResourceAdapter ra = new QpidResourceAdapter();
+ QpidRAManagedConnectionFactory mcf = new QpidRAManagedConnectionFactory();
+ mcf.setConnectionURL(URL);
+ mcf.setResourceAdapter(ra);
+ QpidRAManagedConnection mc = (QpidRAManagedConnection)mcf.createManagedConnection(null, null);
+ AMQXAResource xa1 = (AMQXAResource)mc.getXAResource();
+
+ XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME);
+ XAConnection connection = factory.createXAConnection("guest", "guest");
+ XASession s2 = connection.createXASession();
+ AMQXAResource xaResource = (AMQXAResource)connection.createXASession().getXAResource();
+
+ assertTrue("QpidRAXAResource and XAResource should be from the same RM", xa1.isSameRM(xaResource));
+ assertTrue("XAResource and QpidRAXAResource should be from the same RM", xaResource.isSameRM(xa1));
+
+ }
+
+}
diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes
index 18320646ee..0546ce92d3 100644
--- a/qpid/java/test-profiles/JavaPre010Excludes
+++ b/qpid/java/test-profiles/JavaPre010Excludes
@@ -66,4 +66,5 @@ org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSel
// JCA system tests require XA support (should look to see if we can reduce scope of excludes here)
org.apache.qpid.ra.QpidRAConnectionTest#*
+org.apache.qpid.ra.QpidRAXAResourceTest#*