diff options
6 files changed, 247 insertions, 41 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java new file mode 100644 index 0000000000..cce6b91781 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java @@ -0,0 +1,32 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.util.List; + +import javax.transaction.xa.XAResource; + +public interface AMQXAResource extends XAResource +{ + public String getBrokerUUID(); + + public List<XAResource> getSiblings(); +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index af9048f1f5..7611c9e8de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -17,8 +17,13 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.qpid.dtx.XidImpl; import org.apache.qpid.transport.DtxXaStatus; @@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option; import org.apache.qpid.transport.RecoverResult; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.XaResult; - -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is an implementation of javax.njms.XAResource. */ -public class XAResourceImpl implements XAResource +public class XAResourceImpl implements AMQXAResource { /** * this XAResourceImpl's logger @@ -57,9 +60,11 @@ public class XAResourceImpl implements XAResource * The time for this resource */ private int _timeout; - + //--- constructor - + + private List<XAResource> _siblings = new ArrayList<XAResource>(); + /** * Create an XAResource associated with a XASession * @@ -157,7 +162,20 @@ public class XAResourceImpl implements XAResource _xaSession.createSession(); convertExecutionErrorToXAErr(e.getException().getErrorCode()); } + checkStatus(result.getStatus()); + + for(XAResource sibling: _siblings) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings"); + } + + sibling.end(xid, flag); + } + + _siblings.clear(); } @@ -216,28 +234,38 @@ public class XAResourceImpl implements XAResource * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. */ public boolean isSameRM(XAResource xaResource) throws XAException - { + { if(this == xaResource) { - return true; - } - if(!(xaResource instanceof XAResourceImpl)) + return true; + } + + if(!(xaResource instanceof AMQXAResource)) { - return false; + return false; } - - XAResourceImpl other = (XAResourceImpl)xaResource; - String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); - String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID(); - + String myUUID = getBrokerUUID(); + String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID(); + if(_logger.isDebugEnabled()) { _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID); } - - return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); - + + boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); + + if(isSameRm) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. "); + } + _siblings.add(xaResource); + } + + return isSameRm; + } /** @@ -369,12 +397,12 @@ public class XAResourceImpl implements XAResource { _timeout = timeout; if (timeout != _timeout && _xid != null) - { + { setDtxTimeout(_timeout); } return true; } - + private void setDtxTimeout(int timeout) throws XAException { _xaSession.getQpidSession() @@ -437,18 +465,23 @@ public class XAResourceImpl implements XAResource { setDtxTimeout(_timeout); } + + for(XAResource sibling: _siblings) + { + sibling.start(xid, flag); + } } /** * Is this resource currently enlisted in a transaction? - * + * * @return true if the resource is associated with a transaction, false otherwise. */ public boolean isEnlisted() { return (_xid != null) ; } - + //------------------------------------------------------------------------ // Private methods //------------------------------------------------------------------------ @@ -517,7 +550,7 @@ public class XAResourceImpl implements XAResource } catch (XAException e) { - e.printStackTrace(); + _logger.error(e.getMessage(), e); throw e; } case ILLEGAL_STATE: @@ -544,7 +577,7 @@ public class XAResourceImpl implements XAResource * convert a generic xid into qpid format * @param xid xid to be converted * @return the qpid formated xid - * @throws XAException when xid is null + * @throws XAException when xid is null */ private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException { @@ -556,4 +589,13 @@ public class XAResourceImpl implements XAResource return XidImpl.convert(xid); } + public String getBrokerUUID() + { + return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); + } + + public List<XAResource> getSiblings() + { + return Collections.unmodifiableList(_siblings); + } } diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java index 22b39792b1..37ae7f5514 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java @@ -21,10 +21,13 @@ package org.apache.qpid.ra; +import java.util.List; + import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import org.apache.qpid.client.AMQXAResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +35,7 @@ import org.slf4j.LoggerFactory; * QpidRAXAResource. * */ -public class QpidRAXAResource implements XAResource +public class QpidRAXAResource implements AMQXAResource { /** The logger */ private static final Logger _log = LoggerFactory.getLogger(QpidRAXAResource.class); @@ -192,7 +195,7 @@ public class QpidRAXAResource implements XAResource { _log.trace("isSameRM(" + xaRes + ")"); } - + return _xaResource.isSameRM(xaRes); } @@ -242,4 +245,14 @@ public class QpidRAXAResource implements XAResource return _xaResource.setTransactionTimeout(seconds); } + + public String getBrokerUUID() + { + return ((AMQXAResource)_xaResource).getBrokerUUID(); + } + + public List<XAResource> getSiblings() + { + return ((AMQXAResource)_xaResource).getSiblings(); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java index ac29b72620..e18f70b01d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java @@ -24,21 +24,72 @@ import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.util.FileUtils; +import org.apache.qpid.test.unit.xa.AbstractXATestCase; +import org.apache.qpid.client.AMQXAResource; + +import org.apache.qpid.dtx.XidImpl; import javax.jms.XAConnection; import javax.jms.XAConnectionFactory; import javax.jms.XASession; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; -public class XAResourceTest extends QpidBrokerTestCase +public class XAResourceTest extends AbstractXATestCase { private static final String FACTORY_NAME = "default"; private static final String ALT_FACTORY_NAME = "connection2"; + public void init() throws Exception + { + } + + public void testIsSameRMJoin() throws Exception + { + XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME); + XAConnection conn1 = factory.createXAConnection("guest", "guest"); + XAConnection conn2 = factory.createXAConnection("guest", "guest"); + XAConnection conn3 = factory.createXAConnection("guest", "guest"); + + XASession session1 = conn1.createXASession(); + XASession session2 = conn2.createXASession(); + XASession session3 = conn3.createXASession(); + + AMQXAResource xaResource1 = (AMQXAResource)session1.getXAResource(); + AMQXAResource xaResource2 = (AMQXAResource)session2.getXAResource(); + AMQXAResource xaResource3 = (AMQXAResource)session3.getXAResource(); + + Xid xid = getNewXid(); + + xaResource1.start(xid, XAResource.TMNOFLAGS); + assertTrue("XAResource isSameRM", xaResource1.isSameRM(xaResource2)); + xaResource2.start(xid, XAResource.TMJOIN); + assertTrue("AMQXAResource siblings should be 1", xaResource1.getSiblings().size() == 1); + + assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource2.getSiblings().size() == 0); + + assertTrue("XAResource isSameRM", xaResource2.isSameRM(xaResource3)); + + + xaResource3.start(xid, XAResource.TMJOIN); + assertTrue("AMQXAResource siblings should be 1", xaResource2.getSiblings().size() == 1); + + xaResource1.end(xid, XAResource.TMSUCCESS); + assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource1.getSiblings().size() == 0); + + xaResource1.prepare(xid); + xaResource1.commit(xid, false); + + conn3.close(); + conn2.close(); + conn1.close(); + } + /* * Test with multiple XAResources originating from the same connection factory. XAResource(s) will be equal, - * as they originate from the same session. + * as they originate from the same session. */ public void testIsSameRMSingleCF() throws Exception { @@ -47,14 +98,14 @@ public class XAResourceTest extends QpidBrokerTestCase XASession session = conn.createXASession(); XAResource xaResource1 = session.getXAResource(); XAResource xaResource2 = session.getXAResource(); - + assertEquals("XAResource objects not equal", xaResource1, xaResource2); assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2)); - + session.close(); conn.close(); } - + /* * Test with multiple XAResources originating from different connection factory's and different sessions. XAResources will not be * equal as they do not originate from the same session. As the UUID from the broker will be the same, isSameRM will be true. @@ -67,11 +118,11 @@ public class XAResourceTest extends QpidBrokerTestCase XAConnectionFactory factory = new AMQConnectionFactory(url); XAConnectionFactory factory2 = new AMQConnectionFactory(url); XAConnectionFactory factory3 = getConnectionFactory(ALT_FACTORY_NAME); - + XAConnection conn = factory.createXAConnection("guest","guest"); XAConnection conn2 = factory2.createXAConnection("guest","guest"); XAConnection conn3 = factory3.createXAConnection("guest","guest"); - + XASession session = conn.createXASession(); XASession session2 = conn2.createXASession(); XASession session3 = conn3.createXASession(); @@ -79,14 +130,14 @@ public class XAResourceTest extends QpidBrokerTestCase XAResource xaResource1 = session.getXAResource(); XAResource xaResource2 = session2.getXAResource(); XAResource xaResource3 = session3.getXAResource(); - + assertFalse("XAResource objects should not be equal", xaResource1.equals(xaResource2)); assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2)); assertFalse("isSameRM true for XA Resources created by two different brokers", xaResource1.isSameRM(xaResource3)); - + conn.close(); conn2.close(); - conn3.close(); + conn3.close(); } @Override @@ -103,5 +154,5 @@ public class XAResourceTest extends QpidBrokerTestCase FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); } } - + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java new file mode 100644 index 0000000000..c8116d8cef --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ra; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.Session; + +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XASession; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; + +import org.apache.qpid.client.AMQXAResource; + +import org.apache.qpid.ra.QpidRAConnectionFactoryImpl; +import org.apache.qpid.ra.QpidRAManagedConnectionFactory; +import org.apache.qpid.ra.QpidResourceAdapter; + +public class QpidRAXAResourceTest extends QpidBrokerTestCase +{ + private static final String FACTORY_NAME = "default"; + private static final String BROKER_PORT = "15672"; + private static final String URL = "amqp://guest:guest@client/test?brokerlist='tcp://localhost:" + BROKER_PORT + "?sasl_mechs='PLAIN''"; + + public void testXAResourceIsSameRM() throws Exception + { + QpidResourceAdapter ra = new QpidResourceAdapter(); + QpidRAManagedConnectionFactory mcf = new QpidRAManagedConnectionFactory(); + mcf.setConnectionURL(URL); + mcf.setResourceAdapter(ra); + QpidRAManagedConnection mc = (QpidRAManagedConnection)mcf.createManagedConnection(null, null); + AMQXAResource xa1 = (AMQXAResource)mc.getXAResource(); + + XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME); + XAConnection connection = factory.createXAConnection("guest", "guest"); + XASession s2 = connection.createXASession(); + AMQXAResource xaResource = (AMQXAResource)connection.createXASession().getXAResource(); + + assertTrue("QpidRAXAResource and XAResource should be from the same RM", xa1.isSameRM(xaResource)); + assertTrue("XAResource and QpidRAXAResource should be from the same RM", xaResource.isSameRM(xa1)); + + } + +} diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index 18320646ee..0546ce92d3 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -66,4 +66,5 @@ org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSel // JCA system tests require XA support (should look to see if we can reduce scope of excludes here) org.apache.qpid.ra.QpidRAConnectionTest#* +org.apache.qpid.ra.QpidRAXAResourceTest#* |