diff options
10 files changed, 930 insertions, 36 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 86b4807069..1728ab69f7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -153,7 +153,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected final ExecutorService _taskPool = Executors.newCachedThreadPool(); protected static final long DEFAULT_TIMEOUT = 1000 * 30; - private AMQConnectionDelegate _delegate; + protected AMQConnectionDelegate _delegate; /** * @param broker brokerdetails diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index eccca87dc2..07bd7ea0ae 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -23,18 +23,22 @@ package org.apache.qpid.client; import java.io.IOException; import javax.jms.JMSException; +import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.Session; public interface AMQConnectionDelegate { public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; - public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, + public Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException; + public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; + public void resubscribeSessions() throws JMSException, AMQException, FailoverException; public void closeConneciton(long timeout) throws JMSException, AMQException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 8b6b416f88..cd657570bf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -3,6 +3,7 @@ package org.apache.qpid.client; import java.io.IOException; import javax.jms.JMSException; +import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -11,8 +12,6 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; import org.apache.qpidity.client.Client; import org.apache.qpidity.QpidException; -import org.apache.qpidity.jms.SessionImpl; -import org.apache.qpidity.jms.ExceptionHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +65,31 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate } /** + * create an XA Session and start it if required. + */ + public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException + { + _conn.checkNotClosed(); + int channelId = _conn._idFactory.incrementAndGet(); + XASessionImpl session; + try + { + session = new XASessionImpl(_qpidConnection, _conn, channelId, prefetchHigh, prefetchLow); + _conn.registerSession(channelId, session); + if (_conn._started) + { + session.start(); + } + } + catch (Exception e) + { + throw new JMSAMQException("cannot create session", e); + } + return session; + } + + + /** * Make a connection with the broker * * @param brokerDetail The detail of the broker to connect to. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java index 3ad32d83fb..c22fcc3c7a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Iterator; import javax.jms.JMSException; +import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; @@ -101,6 +102,11 @@ public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate return createSession(transacted, acknowledgeMode, prefetch, prefetch); } + public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException + { + throw new UnsupportedOperationException("0_8 version does not provide XA support"); + } + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index 9b0183967d..dfc87e21b1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -24,13 +24,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Hashtable; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; +import javax.jms.*; import javax.naming.Context; import javax.naming.Name; import javax.naming.NamingException; @@ -45,7 +39,9 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; -public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, ObjectFactory, Referenceable +public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, + ObjectFactory, Referenceable, XATopicConnectionFactory, + XAQueueConnectionFactory, XAConnectionFactory { private String _host; private int _port; @@ -77,18 +73,17 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF _connectionDetails = url; } - /** + /** * This constructor is never used! */ - public AMQConnectionFactory(String broker, String username, String password, - String clientName, String virtualHost) throws URLSyntaxException + public AMQConnectionFactory(String broker, String username, String password, String clientName, String virtualHost) + throws URLSyntaxException { - this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + "/" + - virtualHost + "?brokerlist='" + broker + "'")); + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + clientName + "/" + virtualHost + "?brokerlist='" + broker + "'")); } - /** + /** * This constructor is never used! */ public AMQConnectionFactory(String host, int port, String virtualPath) @@ -96,7 +91,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF this(host, port, "guest", "guest", virtualPath); } - /** + /** * This constructor is never used! */ public AMQConnectionFactory(String host, int port, String defaultUsername, String defaultPassword, @@ -144,17 +139,21 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF /** * Getter for SSLConfiguration + * * @return SSLConfiguration if set, otherwise null */ - public final SSLConfiguration getSSLConfiguration() { + public final SSLConfiguration getSSLConfiguration() + { return _sslConfig; } /** * Setter for SSLConfiguration + * * @param sslConfig config to store */ - public final void setSSLConfiguration(SSLConfiguration sslConfig) { + public final void setSSLConfiguration(SSLConfiguration sslConfig) + { _sslConfig = sslConfig; } @@ -355,8 +354,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF * @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory. * @throws Exception */ - public Object getObjectInstance(Object obj, Name name, Context ctx, - Hashtable env) throws Exception + public Object getObjectInstance(Object obj, Name name, Context ctx, Hashtable env) throws Exception { if (obj instanceof Reference) { @@ -409,10 +407,140 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF public Reference getReference() throws NamingException { - return new Reference( - AMQConnectionFactory.class.getName(), - new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()), - AMQConnectionFactory.class.getName(), - null); // factory location + return new Reference(AMQConnectionFactory.class.getName(), + new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()), + AMQConnectionFactory.class.getName(), null); // factory location + } + + // --------------------------------------------------------------------------------------------------- + // the following methods are provided for XA compatibility + // Those methods are only supported by 0_10 and above + // --------------------------------------------------------------------------------------------------- + + /** + * Creates a XAConnection with the default user identity. + * <p> The XAConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created XAConnection + * @throws JMSException If creating the XAConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XAConnection createXAConnection() throws JMSException + { + if (_connectionDetails.getURLVersion() == ConnectionURL.URL_0_8) + { + throw new UnsupportedOperationException("This version does not support XA operations"); + } + else + { + try + { + return new XAConnectionImpl(_connectionDetails, _sslConfig); + } + catch (Exception e) + { + JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); + jmse.setLinkedException(e); + throw jmse; + } + } + } + + /** + * Creates a XAConnection with the specified user identity. + * <p> The XAConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created XAConnection. + * @throws JMSException If creating the XAConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XAConnection createXAConnection(String username, String password) throws JMSException + { + if (_connectionDetails != null) + { + _connectionDetails.setUsername(username); + _connectionDetails.setPassword(password); + + if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals("")) + { + _connectionDetails.setClientName(getUniqueClientID()); + } + } + else + { + throw new JMSException("A URL must be specified to access XA connections"); + } + return createXAConnection(); + } + + + /** + * Creates a XATopicConnection with the default user identity. + * <p> The XATopicConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created XATopicConnection + * @throws JMSException If creating the XATopicConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XATopicConnection createXATopicConnection() throws JMSException + { + return (XATopicConnection) createXAConnection(); + } + + /** + * Creates a XATopicConnection with the specified user identity. + * <p> The XATopicConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created XATopicConnection. + * @throws JMSException If creating the XATopicConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XATopicConnection createXATopicConnection(String username, String password) throws JMSException + { + return (XATopicConnection) createXAConnection(username, password); + } + + /** + * Creates a XAQueueConnection with the default user identity. + * <p> The XAQueueConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created XAQueueConnection + * @throws JMSException If creating the XAQueueConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XAQueueConnection createXAQueueConnection() throws JMSException + { + return (XAQueueConnection) createXAConnection(); + } + + /** + * Creates a XAQueueConnection with the specified user identity. + * <p> The XAQueueConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created XAQueueConnection. + * @throws JMSException If creating the XAQueueConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException + { + return (XAQueueConnection) createXAConnection(username, password); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index c14b10903c..995f84bab9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -80,15 +80,15 @@ public class AMQSession_0_10 extends AMQSession * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. */ - AMQSession_0_10( org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, - int defaultPrefetchLowMark) + AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, + boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, + int defaultPrefetchHighMark, int defaultPrefetchLowMark) { super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); // create the qpid session with an expiry <= 0 so that the session does not expire - _qpidSession = qpidConnection.createSession(0); + _qpidSession = qpidConnection.createSession(0); // set the exception listnere for this session _qpidSession.setExceptionListener(new QpidSessionExceptionListener()); // set transacted if required @@ -108,8 +108,8 @@ public class AMQSession_0_10 extends AMQSession * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ - AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, - int defaultPrefetchLow) + AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, + boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index af4905710b..7876cc8e49 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -79,7 +79,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; // if there is a replyto destination then we need to request the exchange info - if (message.getMessageProperties().getReplyTo() != null) + if (! message.getMessageProperties().getReplyTo().getExchangeName().equals("")) { Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession() .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java new file mode 100644 index 0000000000..23d5c7bd62 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -0,0 +1,78 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.AMQException; + +import javax.jms.*; + +/** + * This class implements the javax.jms.XAConnection interface + */ +public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection +{ + //-- constructor + /** + * Create a XAConnection from a connectionURL + */ + public XAConnectionImpl(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException + { + super(connectionURL, sslConfig); + } + + //-- interface XAConnection + /** + * Creates an XASession. + * + * @return A newly created XASession. + * @throws JMSException If the XAConnectiono fails to create an XASession due to + * some internal error. + */ + public synchronized XASession createXASession() throws JMSException + { + checkNotClosed(); + return _delegate.createXASession(AMQSession.DEFAULT_PREFETCH_HIGH_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); + } + + //-- Interface XAQueueConnection + /** + * Creates an XAQueueSession. + * + * @return A newly created XASession. + * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to + * some internal error. + */ + public XAQueueSession createXAQueueSession() throws JMSException + { + return (XAQueueSession) createXASession(); + } + + //-- Interface XATopicConnection + /** + * Creates an XAQueueSession. + * + * @return A newly created XASession. + * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to + * some internal error. + */ + public XATopicSession createXATopicSession() throws JMSException + { + return (XATopicSession) createXASession(); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java new file mode 100644 index 0000000000..c28a150a32 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -0,0 +1,507 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.dtx.XidImpl; +import org.apache.qpidity.transport.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of javax.jms.XAResource. + */ +public class XAResourceImpl implements XAResource +{ + /** + * this XAResourceImpl's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class); + + /** + * Reference to the associated XASession + */ + private XASessionImpl _xaSession = null; + + /** + * The XID of this resource + */ + private Xid _xid; + + //--- constructor + + /** + * Create an XAResource associated with a XASession + * + * @param xaSession The session XAresource + */ + protected XAResourceImpl(XASessionImpl xaSession) + { + _xaSession = xaSession; + } + + //--- The XAResource + /** + * Commits the global transaction specified by xid. + * + * @param xid A global transaction identifier + * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid. + * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ, + * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO. + */ + public void commit(Xid xid, boolean b) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("commit ", xid); + } + if (xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + Future<DtxCoordinationCommitResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxCoordinationCommit(XidImpl.convertToString(xid), b ? Option.ONE_PHASE : Option.NO_OPTION); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + // now wait on the future for the result + DtxCoordinationCommitResult result = future.get(); + int status = result.getStatus(); + switch (status) + { + case Constant.XA_OK: + // do nothing this ok + break; + case Constant.XA_HEURHAZ: + throw new XAException(XAException.XA_HEURHAZ); + case Constant.XA_HEURCOM: + throw new XAException(XAException.XA_HEURCOM); + case Constant.XA_HEURRB: + throw new XAException(XAException.XA_HEURRB); + case Constant.XA_HEURMIX: + throw new XAException(XAException.XA_HEURMIX); + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } + } + + /** + * Ends the work performed on behalf of a transaction branch. + * The resource manager disassociates the XA resource from the transaction branch specified + * and lets the transaction complete. + * <ul> + * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state. + * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified. + * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only + * <li> If TMSUCCESS is specified, the portion of work has completed successfully. + * /ul> + * + * @param xid A global transaction identifier that is the same as the identifier used previously in the start method + * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND. + * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR, + * XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*. + */ + public void end(Xid xid, int flag) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("end ", xid); + } + if (xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + Future<DtxDemarcationEndResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxDemarcationEnd(XidImpl.convertToString(xid), + flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, + flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + // now wait on the future for the result + DtxDemarcationEndResult result = future.get(); + int status = result.getStatus(); + switch (status) + { + case Constant.XA_OK: + // do nothing this ok + break; + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } + } + + /** + * Tells the resource manager to forget about a heuristically completed transaction branch. + * + * @param xid String(xid.getGlobalTransactionId() A global transaction identifier + * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, + * XAER_NOTA, XAER_INVAL, or XAER_PROTO. + */ + public void forget(Xid xid) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("forget ", xid); + } + if (xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + _xaSession.getQpidSession().dtxCoordinationForget(new String(xid.getGlobalTransactionId())); + } + + /** + * Obtains the current transaction timeout value set for this XAResource instance. + * If XAResource.setTransactionTimeout was not used prior to invoking this method, + * the return value is the default timeout i.e. 0; + * otherwise, the value used in the previous setTransactionTimeout call is returned. + * + * @return The transaction timeout value in seconds. + * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. + */ + public int getTransactionTimeout() throws XAException + { + int result = 0; + if (_xid != null) + { + try + { + Future<DtxCoordinationGetTimeoutResult> future = + _xaSession.getQpidSession().dtxCoordinationGetTimeout(XidImpl.convertToString(_xid)); + result = (int) future.get().getTimeout(); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + } + return result; + } + + /** + * This method is called to determine if the resource manager instance represented + * by the target object is the same as the resouce manager instance represented by + * the parameter xaResource. + * + * @param xaResource An XAResource object whose resource manager instance is to + * be compared with the resource manager instance of the target object + * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>. + * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. + */ + public boolean isSameRM(XAResource xaResource) throws XAException + { + // TODO : get the server identity of xaResource and compare it with our own one + return false; + } + + /** + * Prepare for a transaction commit of the transaction specified in <code>Xid</code>. + * + * @param xid A global transaction identifier. + * @return A value indicating the resource manager's vote on the outcome of the transaction. + * The possible values are: XA_RDONLY or XA_OK. + * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA + */ + public int prepare(Xid xid) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("prepare ", xid); + } + if (xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + Future<DtxCoordinationPrepareResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxCoordinationPrepare(XidImpl.convertToString(xid)); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + DtxCoordinationPrepareResult result = future.get(); + int status = result.getStatus(); + int outcome; + switch (status) + { + case Constant.XA_OK: + outcome = XAResource.XA_OK; + break; + case Constant.XA_RDONLY: + outcome = XAResource.XA_RDONLY; + break; + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } + return outcome; + } + + /** + * Obtains a list of prepared transaction branches. + * <p/> + * The transaction manager calls this method during recovery to obtain the list of transaction branches + * that are currently in prepared or heuristically completed states. + * + * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. + * TMNOFLAGS must be used when no other flags are set in the parameter. + * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically + * completed state. + * @throws XAException An error has occurred. Possible value is XAER_INVAL. + */ + public Xid[] recover(int flag) throws XAException + { + // the flag is ignored + Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession().dtxCoordinationRecover(); + DtxCoordinationRecoverResult res = future.get(); + // todo make sure that the keys of the returned map are the xids + Xid[] result = new Xid[res.getInDoubt().size()]; + int i = 0; + try + { + for (String xid : res.getInDoubt().keySet()) + { + result[i] = new XidImpl(xid); + i++; + } + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert string into Xid ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + return result; + } + + /** + * Informs the resource manager to roll back work done on behalf of a transaction branch + * + * @param xid A global transaction identifier. + * @throws XAException An error has occurred. + */ + public void rollback(Xid xid) throws XAException + { + if (xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + // the flag is ignored + Future<DtxCoordinationRollbackResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxCoordinationRollback(XidImpl.convertToString(xid)); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + // now wait on the future for the result + DtxCoordinationRollbackResult result = future.get(); + int status = result.getStatus(); + switch (status) + { + case Constant.XA_OK: + // do nothing this ok + break; + case Constant.XA_HEURHAZ: + throw new XAException(XAException.XA_HEURHAZ); + case Constant.XA_HEURCOM: + throw new XAException(XAException.XA_HEURCOM); + case Constant.XA_HEURRB: + throw new XAException(XAException.XA_HEURRB); + case Constant.XA_HEURMIX: + throw new XAException(XAException.XA_HEURMIX); + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } + } + + /** + * Sets the current transaction timeout value for this XAResource instance. + * Once set, this timeout value is effective until setTransactionTimeout is + * invoked again with a different value. + * To reset the timeout value to the default value used by the resource manager, set the value to zero. + * + * @param timeout The transaction timeout value in seconds. + * @return true if transaction timeout value is set successfully; otherwise false. + * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL. + */ + public boolean setTransactionTimeout(int timeout) throws XAException + { + boolean result = false; + if (_xid != null) + { + try + { + _xaSession.getQpidSession() + .dtxCoordinationSetTimeout(XidImpl.convertToString(_xid), timeout); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + result = true; + } + return result; + } + + /** + * Starts work on behalf of a transaction branch specified in xid. + * <ul> + * <li> If TMJOIN is specified, an exception is thrown as it is not supported + * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid. + * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the + * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code. + * </ul> + * + * @param xid A global transaction identifier to be associated with the resource + * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME + * @throws XAException An error has occurred. Possible exceptions + * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO. + */ + public void start(Xid xid, int flag) throws XAException + { + if (_logger.isDebugEnabled()) + { + _logger.debug("start ", xid); + } + if (xid == null) + { + throw new XAException(XAException.XAER_PROTO); + } + _xid = xid; + Future<DtxDemarcationStartResult> future; + try + { + future = _xaSession.getQpidSession() + .dtxDemarcationStart(XidImpl.convertToString(xid), + flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, + flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Cannot convert Xid into String format ", e); + } + throw new XAException(XAException.XAER_PROTO); + } + // now wait on the future for the result + DtxDemarcationStartResult result = future.get(); + int status = result.getStatus(); + switch (status) + { + case Constant.XA_OK: + // do nothing this ok + break; + case Constant.XA_RBROLLBACK: + throw new XAException(XAException.XA_RBROLLBACK); + case Constant.XA_RBTIMEOUT: + throw new XAException(XAException.XA_RBTIMEOUT); + default: + // this should not happen + if (_logger.isDebugEnabled()) + { + _logger.debug("got unexpected status value: ", status); + } + throw new XAException(XAException.XAER_PROTO); + } + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java new file mode 100644 index 0000000000..8323070625 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -0,0 +1,147 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +import org.apache.qpidity.client.DtxSession; +import org.apache.qpid.client.message.MessageFactoryRegistry; + +import javax.jms.*; +import javax.transaction.xa.XAResource; + +/** + * This is an implementation of the javax.jms.XASEssion interface. + */ +public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopicSession, XAQueueSession +{ + /** + * XAResource associated with this XASession + */ + private final XAResourceImpl _xaResource; + + /** + * This XASession Qpid DtxSession + */ + private DtxSession _qpidDtxSession; + + /** + * The standard session + */ + private Session _jmsSession; + + + //-- Constructors + /** + * Create a JMS XASession + */ + public XASessionImpl(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId, + int defaultPrefetchHigh, int defaultPrefetchLow) + { + super(qpidConnection, con, channelId, false, // this is not a transacted session + Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted + MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); + _qpidDtxSession = qpidConnection.createDTXSession(0); + _xaResource = new XAResourceImpl(this); + } + + //--- javax.jms.XASEssion API + + /** + * Gets the session associated with this XASession. + * + * @return The session object. + * @throws JMSException if an internal error occurs. + */ + public Session getSession() throws JMSException + { + if (_jmsSession == null) + { + _jmsSession = getAMQConnection().createSession(true, getAcknowledgeMode()); + } + return _jmsSession; + } + + /** + * Returns an XA resource. + * + * @return An XA resource. + */ + public XAResource getXAResource() + { + return _xaResource; + } + + //-- overwritten mehtods + /** + * Throws a {@link TransactionInProgressException}, since it should + * not be called for an XASession object. + * + * @throws TransactionInProgressException always. + */ + public void commit() throws JMSException + { + throw new TransactionInProgressException( + "XASession: A direct invocation of the commit operation is probibited!"); + } + + /** + * Throws a {@link TransactionInProgressException}, since it should + * not be called for an XASession object. + * + * @throws TransactionInProgressException always. + */ + public void rollback() throws JMSException + { + throw new TransactionInProgressException( + "XASession: A direct invocation of the rollback operation is probibited!"); + } + + /** + * Access to the underlying Qpid Session + * + * @return The associated Qpid Session. + */ + protected org.apache.qpidity.client.DtxSession getQpidSession() + { + return _qpidDtxSession; + } + + //--- interface XAQueueSession + /** + * Gets the topic session associated with this <CODE>XATopicSession</CODE>. + * + * @return the topic session object + * @throws JMSException If an internal error occurs. + */ + public QueueSession getQueueSession() throws JMSException + { + return (QueueSession) getSession(); + } + + //--- interface XATopicSession + + /** + * Gets the topic session associated with this <CODE>XATopicSession</CODE>. + * + * @return the topic session object + * @throws JMSException If an internal error occurs. + */ + public TopicSession getTopicSession() throws JMSException + { + return (TopicSession) getSession(); + } +} |