diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /java/client/src | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
31 files changed, 489 insertions, 213 deletions
diff --git a/java/client/src/main/java/client.bnd b/java/client/src/main/java/client.bnd index 495ea6793f..4b9b191520 100755 --- a/java/client/src/main/java/client.bnd +++ b/java/client/src/main/java/client.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.17.0 +ver: 0.19.0 Bundle-SymbolicName: qpid-client Bundle-Version: ${ver} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 23b47c8d67..d80858a7a1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -69,6 +69,7 @@ import javax.naming.StringRefAddr; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.util.ArrayList; @@ -78,11 +79,14 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); + private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); + private final long _connectionNumber; /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be @@ -222,6 +226,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new IllegalArgumentException("Connection must be specified"); } + _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Connection(" + _connectionNumber + "):" + connectionURL); + } + // set this connection maxPrefetch if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { @@ -308,11 +319,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _delegate = new AMQConnectionDelegate_0_10(this); } - if (_logger.isDebugEnabled()) - { - _logger.debug("Connection:" + connectionURL); - } - _connectionURL = connectionURL; _clientName = connectionURL.getClientName(); @@ -1519,4 +1525,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _delegate; } + + public Long getConnectionNumber() + { + return _connectionNumber; + } + + protected void logConnected(SocketAddress localAddress, SocketAddress remoteAddress) + { + if(_logger.isInfoEnabled()) + { + _logger.info("Connection " + _connectionNumber + " now connected from " + + localAddress + " to " + remoteAddress); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index a1a06c5547..51e7e4153d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -222,6 +222,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.setUsername(_qpidConnection.getUserID()); _conn.setMaximumChannelCount(_qpidConnection.getChannelMax()); _conn.getFailoverPolicy().attainedConnection(); + _conn.logConnected(_qpidConnection.getLocalAddress(), _qpidConnection.getRemoteAddress()); } catch (ProtocolVersionException pe) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 08ee7c3705..e1bf007e83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -24,13 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.BasicQosBody; @@ -68,7 +66,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; - public void closeConnection(long timeout) throws JMSException, AMQException { _conn.getProtocolHandler().closeConnection(timeout); @@ -110,9 +107,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate sslContext = SSLContextFactory.buildClientContext( settings.getTrustStorePath(), settings.getTrustStorePassword(), + settings.getTrustStoreType(), settings.getTrustManagerFactoryAlgorithm(), settings.getKeyStorePath(), settings.getKeyStorePassword(), + settings.getKeyStoreType(), settings.getKeyManagerFactoryAlgorithm(), settings.getCertAlias()); } @@ -137,6 +136,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn.getFailoverPolicy().attainedConnection(); _conn.setConnected(true); + _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress()); return null; } else @@ -283,7 +283,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); + + // reset the flow control flag + // on opening channel, broker sends flow blocked if virtual host is blocked + // if virtual host is not blocked, then broker does not send flow command + // that's why we need to reset the flow control flag + s.setFlowControl(true); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted()); + s.resubscribe(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index cc91746d98..8bc815d98e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -55,12 +55,13 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF ObjectFactory, Referenceable, XATopicConnectionFactory, XAQueueConnectionFactory, XAConnectionFactory { - private final ConnectionURL _connectionDetails; + protected static final String NO_URL_CONFIGURED = "The connection factory wasn't created with a proper URL, the connection details are empty"; + + private ConnectionURL _connectionDetails; // The default constructor is necessary to allow AMQConnectionFactory to be deserialised from JNDI public AMQConnectionFactory() { - _connectionDetails = null; } public AMQConnectionFactory(final String url) throws URLSyntaxException @@ -106,6 +107,11 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF public Connection createConnection() throws JMSException { + if(_connectionDetails == null) + { + throw new JMSException(NO_URL_CONFIGURED); + } + try { if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals("")) @@ -158,7 +164,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF } else { - throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty"); + throw new JMSException(NO_URL_CONFIGURED); } } @@ -193,6 +199,12 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF return _connectionDetails.toString(); } + //setter necessary to use instances created with the default constructor (which we can't remove) + public final void setConnectionURLString(String url) throws URLSyntaxException + { + _connectionDetails = new AMQConnectionURL(url); + } + /** * JNDI interface to create objects from References. * @@ -332,7 +344,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF } else { - throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty"); + throw new JMSException(NO_URL_CONFIGURED); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 55d3ccb6e7..1468e90c4e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,18 +122,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; - /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked */ - private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); /** * The period to wait while flow controlled before declaring a failure */ - private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", + private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, DEFAULT_FLOW_CONTROL_WAIT_FAILURE); private final boolean _delareQueues = @@ -797,11 +801,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (e instanceof AMQDisconnectedException) { - if (_dispatcherThread != null) - { - // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcherThread.interrupt(); - } + // Failover failed and ain't coming back. Knife the dispatcher. + stopDispatcherThread(); } @@ -830,6 +831,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + protected void stopDispatcherThread() + { + if (_dispatcherThread != null) + { + _dispatcherThread.interrupt(); + } + } /** * Commits all messages done in this transaction and releases any locks currently held. * @@ -2366,7 +2374,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new Error("Error creating Dispatcher thread",e); } - _dispatcherThread.setName("Dispatcher-Channel-" + _channelId); + + String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber(); + + _dispatcherThread.setName(dispatcherThreadName); _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD); _dispatcher.setConnectionStopped(initiallyStopped); _dispatcherThread.start(); @@ -3130,6 +3141,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _ticket = ticket; } + public boolean isFlowBlocked() + { + synchronized (_flowControl) + { + return !_flowControl.getFlowControl(); + } + } + public void setFlowControl(final boolean active) { _flowControl.setFlowControl(active); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 3902c726f3..8a7c6b1a01 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -128,7 +128,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Used to store the range of in tx messages */ private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet(); - private int _txSize = 0; + private int _txSize = 0; + private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour"); //--- constructors /** @@ -390,11 +391,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public void sendClose(long timeout) throws AMQException, FailoverException { - if (flushTask != null) - { - flushTask.cancel(); - flushTask = null; - } + cancelTimerTask(); flushAcknowledgments(); try { @@ -1051,9 +1048,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { code = ee.getErrorCode().getValue(); } - AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); + AMQException amqe = new AMQException(AMQConstant.getConstant(code), _isHardError, se.getMessage(), se.getCause()); _currentException = amqe; } + if (!_isHardError) + { + cancelTimerTask(); + stopDispatcherThread(); + try + { + closed(_currentException); + } + catch(Exception e) + { + _logger.warn("Error closing session", e); + } + } getAMQConnection().exceptionReceived(_currentException); } @@ -1408,5 +1418,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } + @Override + public boolean isFlowBlocked() + { + return _qpidSession.isFlowBlocked(); + } + + private void cancelTimerTask() + { + if (flushTask != null) + { + flushTask.cancel(); + flushTask = null; + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java b/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java new file mode 100644 index 0000000000..cce6b91781 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java @@ -0,0 +1,32 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.util.List; + +import javax.transaction.xa.XAResource; + +public interface AMQXAResource extends XAResource +{ + public String getBrokerUUID(); + + public List<XAResource> getSiblings(); +} diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index af9048f1f5..6341510c2f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -17,8 +17,13 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.qpid.dtx.XidImpl; import org.apache.qpid.transport.DtxXaStatus; @@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option; import org.apache.qpid.transport.RecoverResult; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.XaResult; - -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is an implementation of javax.njms.XAResource. */ -public class XAResourceImpl implements XAResource +public class XAResourceImpl implements AMQXAResource { /** * this XAResourceImpl's logger @@ -57,9 +60,11 @@ public class XAResourceImpl implements XAResource * The time for this resource */ private int _timeout; - + //--- constructor - + + private List<XAResource> _siblings = new ArrayList<XAResource>(); + /** * Create an XAResource associated with a XASession * @@ -157,7 +162,21 @@ public class XAResourceImpl implements XAResource _xaSession.createSession(); convertExecutionErrorToXAErr(e.getException().getErrorCode()); } + checkStatus(result.getStatus()); + + if(_logger.isDebugEnabled()) + { + _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings"); + } + + for(XAResource sibling: _siblings) + { + + sibling.end(xid, flag); + } + + _siblings.clear(); } @@ -216,28 +235,38 @@ public class XAResourceImpl implements XAResource * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL. */ public boolean isSameRM(XAResource xaResource) throws XAException - { + { if(this == xaResource) { - return true; - } - if(!(xaResource instanceof XAResourceImpl)) + return true; + } + + if(!(xaResource instanceof AMQXAResource)) { - return false; + return false; } - - XAResourceImpl other = (XAResourceImpl)xaResource; - String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); - String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID(); - + String myUUID = getBrokerUUID(); + String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID(); + if(_logger.isDebugEnabled()) { _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID); } - - return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); - + + boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID)); + + if(isSameRm) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. "); + } + _siblings.add(xaResource); + } + + return isSameRm; + } /** @@ -369,12 +398,12 @@ public class XAResourceImpl implements XAResource { _timeout = timeout; if (timeout != _timeout && _xid != null) - { + { setDtxTimeout(_timeout); } return true; } - + private void setDtxTimeout(int timeout) throws XAException { _xaSession.getQpidSession() @@ -437,18 +466,23 @@ public class XAResourceImpl implements XAResource { setDtxTimeout(_timeout); } + + for(XAResource sibling: _siblings) + { + sibling.start(xid, flag); + } } /** * Is this resource currently enlisted in a transaction? - * + * * @return true if the resource is associated with a transaction, false otherwise. */ public boolean isEnlisted() { return (_xid != null) ; } - + //------------------------------------------------------------------------ // Private methods //------------------------------------------------------------------------ @@ -517,7 +551,7 @@ public class XAResourceImpl implements XAResource } catch (XAException e) { - e.printStackTrace(); + _logger.error(e.getMessage(), e); throw e; } case ILLEGAL_STATE: @@ -544,7 +578,7 @@ public class XAResourceImpl implements XAResource * convert a generic xid into qpid format * @param xid xid to be converted * @return the qpid formated xid - * @throws XAException when xid is null + * @throws XAException when xid is null */ private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException { @@ -556,4 +590,13 @@ public class XAResourceImpl implements XAResource return XidImpl.convert(xid); } + public String getBrokerUUID() + { + return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID(); + } + + public List<XAResource> getSiblings() + { + return Collections.unmodifiableList(_siblings); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index f2efb6e8a5..05bd121bbd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -17,6 +17,7 @@ */ package org.apache.qpid.client; +import org.apache.qpid.AMQException; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.transport.RangeSet; @@ -31,7 +32,7 @@ import javax.jms.XATopicSession; import javax.transaction.xa.XAResource; /** - * This is an implementation of the javax.njms.XASEssion interface. + * This is an implementation of the javax.jms.XASession interface. */ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopicSession, XAQueueSession { @@ -67,7 +68,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic { this(qpidConnection, con, channelId, false, ackMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow, null); - + } public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, @@ -92,9 +93,6 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic _qpidDtxSession.dtxSelect(); } - - // javax.njms.XASEssion API - /** * Gets the session associated with this XASession. * @@ -192,4 +190,11 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic super.acknowledgeImpl() ; } } + + @Override + void resubscribe() throws AMQException + { + super.resubscribe(); + createSession(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java index 3a3ddae52f..fa544f2d2e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client.handler; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.client.handler; * under the License. * */ +package org.apache.qpid.client.handler; import org.slf4j.Logger; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java index 919c5f6d67..46b1f08db3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.client.handler; import org.slf4j.Logger; @@ -8,26 +28,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ChannelFlowBody; -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ public class ChannelFlowMethodHandler implements StateAwareMethodListener<ChannelFlowBody> { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java index 11d99f5446..69ecd7bfba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client.message; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.client.message; * under the License. * */ +package org.apache.qpid.client.message; import org.apache.qpid.AMQException; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java index f997862bb0..183ec4cb40 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client.message; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.client.message; * under the License. * */ +package org.apache.qpid.client.message; import org.apache.qpid.AMQException; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index d1e43447cc..9e15b08f12 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -321,7 +321,40 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message protected abstract String getMimeType(); + public String toHeaderString() throws JMSException + { + StringBuffer buf = new StringBuffer(); + buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID()); + buf.append("\nJMS timestamp: ").append(getJMSTimestamp()); + buf.append("\nJMS expiration: ").append(getJMSExpiration()); + buf.append("\nJMS priority: ").append(getJMSPriority()); + buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); + buf.append("\nJMS reply to: ").append(getReplyToString()); + buf.append("\nJMS Redelivered: ").append(_redelivered); + buf.append("\nJMS Destination: ").append(getJMSDestination()); + buf.append("\nJMS Type: ").append(getJMSType()); + buf.append("\nJMS MessageID: ").append(getJMSMessageID()); + buf.append("\nJMS Content-Type: ").append(getContentType()); + buf.append("\nAMQ message number: ").append(getDeliveryTag()); + + buf.append("\nProperties:"); + final Enumeration propertyNames = getPropertyNames(); + if (!propertyNames.hasMoreElements()) + { + buf.append("<NONE>"); + } + else + { + buf.append('\n'); + while(propertyNames.hasMoreElements()) + { + String propertyName = (String) propertyNames.nextElement(); + buf.append("\t").append(propertyName).append(" = ").append(getObjectProperty(propertyName)).append("\n"); + } + } + return buf.toString(); + } public String toString() { @@ -330,35 +363,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message StringBuffer buf = new StringBuffer("Body:\n"); buf.append(toBodyString()); - buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID()); - buf.append("\nJMS timestamp: ").append(getJMSTimestamp()); - buf.append("\nJMS expiration: ").append(getJMSExpiration()); - buf.append("\nJMS priority: ").append(getJMSPriority()); - buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); - buf.append("\nJMS reply to: ").append(getReplyToString()); - buf.append("\nJMS Redelivered: ").append(_redelivered); - buf.append("\nJMS Destination: ").append(getJMSDestination()); - buf.append("\nJMS Type: ").append(getJMSType()); - buf.append("\nJMS MessageID: ").append(getJMSMessageID()); - buf.append("\nJMS Content-Type: ").append(getContentType()); - buf.append("\nAMQ message number: ").append(getDeliveryTag()); - - buf.append("\nProperties:"); - final Enumeration propertyNames = getPropertyNames(); - if (!propertyNames.hasMoreElements()) - { - buf.append("<NONE>"); - } - else - { - buf.append('\n'); - while(propertyNames.hasMoreElements()) - { - String propertyName = (String) propertyNames.nextElement(); - buf.append("\t").append(propertyName).append(" = ").append(getObjectProperty(propertyName)).append("\n"); - } - - } + buf.append(toHeaderString()); return buf.toString(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java index bd63cdb5c5..c3f36a545a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client.message; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.client.message; * under the License. * */ +package org.apache.qpid.client.message; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java index 6e5f33a65c..e253e43a86 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client.message; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.client.message; * under the License. * */ +package org.apache.qpid.client.message; import org.apache.qpid.framing.AMQShortString; diff --git a/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClient.java b/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClient.java new file mode 100644 index 0000000000..9965176772 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClient.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.security.crammd5hashed; + +import java.util.Map; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler; + +/** + * A {@link CRAMMD5HashedSaslClient} merely wraps an instance of a CRAM-MD5 SASL client delegating + * all method calls to it, except {@link #getMechanismName()} which returns "CRAM-MD5-HASHED". + * + * This mechanism must be used with {@link UsernameHashedPasswordCallbackHandler} which is responsible + * for the additional hash of the password. + */ +public class CRAMMD5HashedSaslClient implements SaslClient +{ + private final SaslClient _cramMd5SaslClient; + + public CRAMMD5HashedSaslClient(String authorizationId, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) throws SaslException + { + super(); + String[] mechanisms = {"CRAM-MD5"}; + _cramMd5SaslClient = Sasl.createSaslClient(mechanisms, authorizationId, protocol, serverName, props, cbh); + } + + public void dispose() throws SaslException + { + _cramMd5SaslClient.dispose(); + } + + public String getMechanismName() + { + return CRAMMD5HashedSaslClientFactory.MECHANISM; + } + + public byte[] evaluateChallenge(byte[] challenge) throws SaslException + { + return _cramMd5SaslClient.evaluateChallenge(challenge); + } + + + public Object getNegotiatedProperty(String propName) + { + return _cramMd5SaslClient.getNegotiatedProperty(propName); + } + + public boolean hasInitialResponse() + { + return _cramMd5SaslClient.hasInitialResponse(); + } + + public boolean isComplete() + { + return _cramMd5SaslClient.isComplete(); + } + + public byte[] unwrap(byte[] incoming, int offset, int len) + throws SaslException + { + return _cramMd5SaslClient.unwrap(incoming, offset, len); + } + + public byte[] wrap(byte[] outgoing, int offset, int len) + throws SaslException + { + return _cramMd5SaslClient.wrap(outgoing, offset, len); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java b/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java index cb989f7919..b3ce1a0d23 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java @@ -44,14 +44,13 @@ public class CRAMMD5HashedSaslClientFactory implements SaslClientFactory throw new SaslException("CallbackHandler must not be null"); } - String[] mechs = {"CRAM-MD5"}; - return Sasl.createSaslClient(mechs, authorizationId, protocol, serverName, props, cbh); + return new CRAMMD5HashedSaslClient(authorizationId, protocol, serverName, props, cbh); } } return null; } - public String[] getMechanismNames(Map props) + public String[] getMechanismNames(Map<String,?> props) { if (props != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 7d028e022a..0b6217ffce 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -125,9 +125,9 @@ public class AMQStateManager implements AMQMethodListener */ public void setProtocolSession(AMQProtocolSession session) { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Setting ProtocolSession:" + session); + _logger.debug("Setting ProtocolSession:" + session); } _protocolSession = session; } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index c8903d252f..fd2f003a56 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -61,7 +61,10 @@ public class StateWaiter extends BlockingWaiter<AMQState> */ public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates) { - _logger.info("New StateWaiter :" + currentState + ":" + awaitStates); + if(_logger.isDebugEnabled()) + { + _logger.debug("New StateWaiter :" + currentState + ":" + awaitStates); + } _stateManager = stateManager; _awaitStates = awaitStates; _startState = currentState; diff --git a/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java b/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java index f303d155c6..516ee8cf37 100644 --- a/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java +++ b/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client.url; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.client.url; * under the License. * */ +package org.apache.qpid.client.url; import org.apache.qpid.client.AMQBrokerDetails; diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index f4d2ecc36d..0b4f0800d2 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -33,9 +33,7 @@ public class FailoverPolicy { private static final Logger _logger = LoggerFactory.getLogger(FailoverPolicy.class); - private static final long MINUTE = 60000L; - - private static final long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE; + private final long DEFAULT_METHOD_TIMEOUT = Long.getLong("qpid.failover_method_timeout", 120000); private FailoverMethod[] _methods = new FailoverMethod[1]; diff --git a/java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java b/java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java index 1dbe464230..fe43ae8cd0 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java +++ b/java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java @@ -1,4 +1,3 @@ -package org.apache.qpid.jms; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.jms; * under the License. * */ +package org.apache.qpid.jms; import org.apache.qpid.AMQException; diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 9b202a13ee..f17fb9b5f5 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -69,46 +69,21 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor public Context getInitialContext(Hashtable environment) throws NamingException { Map data = new ConcurrentHashMap(); - + BufferedInputStream inputStream = null; try { - String file = null; - - if (environment.containsKey(Context.PROVIDER_URL)) - { - file = (String) environment.get(Context.PROVIDER_URL); - } - else - { - file = System.getProperty(Context.PROVIDER_URL); - } + String fileName = (environment.containsKey(Context.PROVIDER_URL)) + ? (String)environment.get(Context.PROVIDER_URL) : System.getProperty(Context.PROVIDER_URL); - // Load the properties specified - if (file != null) + if (fileName != null) { - _logger.info("Loading Properties from:" + file); - BufferedInputStream inputStream = null; + _logger.info("Attempting to load " + fileName); - if(file.contains("file:")) - { - inputStream = new BufferedInputStream(new FileInputStream(new File(new URI(file)))); - } - else - { - inputStream = new BufferedInputStream(new FileInputStream(file)); - } - + inputStream = new BufferedInputStream(new FileInputStream((fileName.contains("file:")) + ? new File(new URI(fileName)) : new File(fileName))); Properties p = new Properties(); - - try - { - p.load(inputStream); - } - finally - { - inputStream.close(); - } + p.load(inputStream); Strings.Resolver resolver = new Strings.ChainedResolver (Strings.SYSTEM_RESOLVER, new Strings.PropertiesResolver(p)); @@ -134,12 +109,23 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor catch (IOException ioe) { _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" + - "Due to:"+ioe.getMessage()); + "Due to:" + ioe.getMessage()); } catch(URISyntaxException uoe) { _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" + - "Due to:"+uoe.getMessage()); + "Due to:" + uoe.getMessage()); + } + finally + { + try + { + if(inputStream != null) + { + inputStream.close(); + } + } + catch(Exception ignore){} } createConnectionFactories(data, environment); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java index 7134f0a960..e8455f6dfa 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java @@ -1,4 +1,3 @@ -package org.apache.qpid.nclient.util; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.nclient.util; * under the License. * */ +package org.apache.qpid.nclient.util; import org.apache.qpid.api.Message; diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java index 9a2e9de3d9..a3fce7611f 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java @@ -1,4 +1,3 @@ -package org.apache.qpid.nclient.util; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.nclient.util; * under the License. * */ +package org.apache.qpid.nclient.util; import org.apache.qpid.nclient.MessagePartListener; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionFactoryTest.java index 20496026ce..bb92fa4ecd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionFactoryTest.java @@ -18,7 +18,9 @@ * under the License. * */ -package org.apache.qpid.test.unit.jndi; +package org.apache.qpid.client; + +import javax.jms.JMSException; import junit.framework.TestCase; @@ -26,7 +28,7 @@ import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; -public class ConnectionFactoryTest extends TestCase +public class AMQConnectionFactoryTest extends TestCase { //URL will be returned with the password field swapped for '********' @@ -58,6 +60,20 @@ public class ConnectionFactoryTest extends TestCase assertEquals("tcp", service.getTransport()); assertEquals("localhost", service.getHost()); assertEquals(5672, service.getPort()); + } + + public void testInstanceCreatedWithDefaultConstructorThrowsExceptionOnCallingConnectWithoutSettingURL() throws Exception + { + AMQConnectionFactory factory = new AMQConnectionFactory(); + try + { + factory.createConnection(); + fail("Expected exception not thrown"); + } + catch(JMSException e) + { + assertEquals("Unexpected exception", AMQConnectionFactory.NO_URL_CONFIGURED, e.getMessage()); + } } } diff --git a/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java b/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java index 1fbd7cf212..c535fdd705 100644 --- a/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client.message; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.client.message; * under the License. * */ +package org.apache.qpid.client.message; import junit.framework.TestCase; diff --git a/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java b/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java index 2989970dcd..ce9e681eaf 100644 --- a/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java +++ b/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java @@ -21,51 +21,47 @@ package org.apache.qpid.jndi; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; import java.util.Properties; +import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Queue; import javax.jms.Topic; import javax.naming.ConfigurationException; import javax.naming.Context; import javax.naming.InitialContext; +import javax.naming.NamingException; -import junit.framework.TestCase; - +import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidTestCase; -public class PropertiesFileInitialContextFactoryTest extends TestCase +public class PropertiesFileInitialContextFactoryTest extends QpidTestCase { - private static final String FILE_URL_PATH = System.getProperty("user.dir") + "/client/src/test/java/org/apache/qpid/jndi/"; - private static final String FILE_NAME = "hello.properties"; - - private Context ctx; - - protected void setUp() throws Exception - { - Properties properties = new Properties(); - properties.load(this.getClass().getResourceAsStream("JNDITest.properties")); - - //Create the initial context - ctx = new InitialContext(properties); - } - + private static final String CONNECTION_URL = "amqp://username:password@clientid/test?brokerlist='tcp://testContextFromProviderURL:5672'"; public void testQueueNamesWithTrailingSpaces() throws Exception { + Context ctx = prepareContext(); Queue queue = (Queue)ctx.lookup("QueueNameWithSpace"); assertEquals("QueueNameWithSpace",queue.getQueueName()); } public void testTopicNamesWithTrailingSpaces() throws Exception { + Context ctx = prepareContext(); Topic topic = (Topic)ctx.lookup("TopicNameWithSpace"); assertEquals("TopicNameWithSpace",topic.getTopicName()); } public void testMultipleTopicNamesWithTrailingSpaces() throws Exception { + Context ctx = prepareContext(); Topic topic = (Topic)ctx.lookup("MultipleTopicNamesWithSpace"); int i = 0; for (AMQShortString bindingKey: ((AMQDestination)topic).getBindingKeys()) @@ -83,13 +79,59 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase try { - ctx = new InitialContext(properties); + new InitialContext(properties); fail("A configuration exception should be thrown with details about the address syntax error"); } catch(ConfigurationException e) { assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}")); } + } + + private InitialContext prepareContext() throws IOException, NamingException + { + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream("JNDITest.properties")); + return new InitialContext(properties); + } + + /** + * Test loading of a JNDI properties file through use of a file:// URL + * supplied via the InitialContext.PROVIDER_URL system property. + */ + public void testContextFromProviderURL() throws Exception + { + Properties properties = new Properties(); + properties.put("connectionfactory.qpidConnectionfactory", CONNECTION_URL); + properties.put("destination.topicExchange", "destName"); + + File f = File.createTempFile(getTestName(), ".properties"); + try + { + FileOutputStream fos = new FileOutputStream(f); + properties.store(fos, null); + fos.close(); + + setTestSystemProperty(ClientProperties.DEST_SYNTAX, "ADDR"); + setTestSystemProperty(InitialContext.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); + setTestSystemProperty(InitialContext.PROVIDER_URL, "file://" + f.getCanonicalPath()); + + InitialContext context = new InitialContext(); + Destination dest = (Destination) context.lookup("topicExchange"); + assertNotNull("Lookup from URI based context should not be null", dest); + assertTrue("Unexpected value from lookup", dest.toString().contains("destName")); + + ConnectionFactory factory = (ConnectionFactory) context.lookup("qpidConnectionfactory"); + assertTrue("ConnectionFactory was not an instance of AMQConnectionFactory", factory instanceof AMQConnectionFactory); + assertEquals("Unexpected ConnectionURL value", CONNECTION_URL.replaceAll("password", "********"), + ((AMQConnectionFactory)factory).getConnectionURLString()); + + context.close(); + } + finally + { + f.delete(); + } } } diff --git a/java/client/src/test/java/org/apache/qpid/jndi/hello.properties b/java/client/src/test/java/org/apache/qpid/jndi/hello.properties deleted file mode 100644 index d017d137fe..0000000000 --- a/java/client/src/test/java/org/apache/qpid/jndi/hello.properties +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory - -# register some connection factories -# connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://10.0.1.46:5672' - -# Register an AMQP destination in JNDI -# destination.[jniName] = [Address Format] -destination.topicExchange = amq.topic |