summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /java/client/src
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rwxr-xr-xjava/client/src/main/java/client.bnd2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java30
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java39
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java99
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java40
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java63
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClient.java91
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/url/URLParser.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java56
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQConnectionFactoryTest.java (renamed from java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java)20
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java78
-rw-r--r--java/client/src/test/java/org/apache/qpid/jndi/hello.properties27
31 files changed, 489 insertions, 213 deletions
diff --git a/java/client/src/main/java/client.bnd b/java/client/src/main/java/client.bnd
index 495ea6793f..4b9b191520 100755
--- a/java/client/src/main/java/client.bnd
+++ b/java/client/src/main/java/client.bnd
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.17.0
+ver: 0.19.0
Bundle-SymbolicName: qpid-client
Bundle-Version: ${ver}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 23b47c8d67..d80858a7a1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -69,6 +69,7 @@ import javax.naming.StringRefAddr;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
+import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
@@ -78,11 +79,14 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
+ private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
+ private final long _connectionNumber;
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
@@ -222,6 +226,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new IllegalArgumentException("Connection must be specified");
}
+ _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Connection(" + _connectionNumber + "):" + connectionURL);
+ }
+
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
@@ -308,11 +319,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_delegate = new AMQConnectionDelegate_0_10(this);
}
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Connection:" + connectionURL);
- }
-
_connectionURL = connectionURL;
_clientName = connectionURL.getClientName();
@@ -1519,4 +1525,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _delegate;
}
+
+ public Long getConnectionNumber()
+ {
+ return _connectionNumber;
+ }
+
+ protected void logConnected(SocketAddress localAddress, SocketAddress remoteAddress)
+ {
+ if(_logger.isInfoEnabled())
+ {
+ _logger.info("Connection " + _connectionNumber + " now connected from "
+ + localAddress + " to " + remoteAddress);
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index a1a06c5547..51e7e4153d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -222,6 +222,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_conn.setUsername(_qpidConnection.getUserID());
_conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
_conn.getFailoverPolicy().attainedConnection();
+ _conn.logConnected(_qpidConnection.getLocalAddress(), _qpidConnection.getRemoteAddress());
}
catch (ProtocolVersionException pe)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 08ee7c3705..e1bf007e83 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -24,13 +24,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.BasicQosBody;
@@ -68,7 +66,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
-
public void closeConnection(long timeout) throws JMSException, AMQException
{
_conn.getProtocolHandler().closeConnection(timeout);
@@ -110,9 +107,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
sslContext = SSLContextFactory.buildClientContext(
settings.getTrustStorePath(),
settings.getTrustStorePassword(),
+ settings.getTrustStoreType(),
settings.getTrustManagerFactoryAlgorithm(),
settings.getKeyStorePath(),
settings.getKeyStorePassword(),
+ settings.getKeyStoreType(),
settings.getKeyManagerFactoryAlgorithm(),
settings.getCertAlias());
}
@@ -137,6 +136,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
_conn.getFailoverPolicy().attainedConnection();
_conn.setConnected(true);
+ _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress());
return null;
}
else
@@ -283,7 +283,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
+
+ // reset the flow control flag
+ // on opening channel, broker sends flow blocked if virtual host is blocked
+ // if virtual host is not blocked, then broker does not send flow command
+ // that's why we need to reset the flow control flag
+ s.setFlowControl(true);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
+
s.resubscribe();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
index cc91746d98..8bc815d98e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
@@ -55,12 +55,13 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
ObjectFactory, Referenceable, XATopicConnectionFactory,
XAQueueConnectionFactory, XAConnectionFactory
{
- private final ConnectionURL _connectionDetails;
+ protected static final String NO_URL_CONFIGURED = "The connection factory wasn't created with a proper URL, the connection details are empty";
+
+ private ConnectionURL _connectionDetails;
// The default constructor is necessary to allow AMQConnectionFactory to be deserialised from JNDI
public AMQConnectionFactory()
{
- _connectionDetails = null;
}
public AMQConnectionFactory(final String url) throws URLSyntaxException
@@ -106,6 +107,11 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
public Connection createConnection() throws JMSException
{
+ if(_connectionDetails == null)
+ {
+ throw new JMSException(NO_URL_CONFIGURED);
+ }
+
try
{
if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals(""))
@@ -158,7 +164,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
}
else
{
- throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty");
+ throw new JMSException(NO_URL_CONFIGURED);
}
}
@@ -193,6 +199,12 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
return _connectionDetails.toString();
}
+ //setter necessary to use instances created with the default constructor (which we can't remove)
+ public final void setConnectionURLString(String url) throws URLSyntaxException
+ {
+ _connectionDetails = new AMQConnectionURL(url);
+ }
+
/**
* JNDI interface to create objects from References.
*
@@ -332,7 +344,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
}
else
{
- throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty");
+ throw new JMSException(NO_URL_CONFIGURED);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 55d3ccb6e7..1468e90c4e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.client;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,18 +122,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
-
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
*/
- private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+ private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
+ DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
/**
* The period to wait while flow controlled before declaring a failure
*/
- private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure",
+ private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
private final boolean _delareQueues =
@@ -797,11 +801,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (e instanceof AMQDisconnectedException)
{
- if (_dispatcherThread != null)
- {
- // Failover failed and ain't coming back. Knife the dispatcher.
- _dispatcherThread.interrupt();
- }
+ // Failover failed and ain't coming back. Knife the dispatcher.
+ stopDispatcherThread();
}
@@ -830,6 +831,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ protected void stopDispatcherThread()
+ {
+ if (_dispatcherThread != null)
+ {
+ _dispatcherThread.interrupt();
+ }
+ }
/**
* Commits all messages done in this transaction and releases any locks currently held.
*
@@ -2366,7 +2374,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new Error("Error creating Dispatcher thread",e);
}
- _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
+
+ String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber();
+
+ _dispatcherThread.setName(dispatcherThreadName);
_dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD);
_dispatcher.setConnectionStopped(initiallyStopped);
_dispatcherThread.start();
@@ -3130,6 +3141,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_ticket = ticket;
}
+ public boolean isFlowBlocked()
+ {
+ synchronized (_flowControl)
+ {
+ return !_flowControl.getFlowControl();
+ }
+ }
+
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 3902c726f3..8a7c6b1a01 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -128,7 +128,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Used to store the range of in tx messages
*/
private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet();
- private int _txSize = 0;
+ private int _txSize = 0;
+ private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour");
//--- constructors
/**
@@ -390,11 +391,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
- if (flushTask != null)
- {
- flushTask.cancel();
- flushTask = null;
- }
+ cancelTimerTask();
flushAcknowledgments();
try
{
@@ -1051,9 +1048,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
code = ee.getErrorCode().getValue();
}
- AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+ AMQException amqe = new AMQException(AMQConstant.getConstant(code), _isHardError, se.getMessage(), se.getCause());
_currentException = amqe;
}
+ if (!_isHardError)
+ {
+ cancelTimerTask();
+ stopDispatcherThread();
+ try
+ {
+ closed(_currentException);
+ }
+ catch(Exception e)
+ {
+ _logger.warn("Error closing session", e);
+ }
+ }
getAMQConnection().exceptionReceived(_currentException);
}
@@ -1408,5 +1418,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
+ @Override
+ public boolean isFlowBlocked()
+ {
+ return _qpidSession.isFlowBlocked();
+ }
+
+ private void cancelTimerTask()
+ {
+ if (flushTask != null)
+ {
+ flushTask.cancel();
+ flushTask = null;
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java b/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java
new file mode 100644
index 0000000000..cce6b91781
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQXAResource.java
@@ -0,0 +1,32 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.List;
+
+import javax.transaction.xa.XAResource;
+
+public interface AMQXAResource extends XAResource
+{
+ public String getBrokerUUID();
+
+ public List<XAResource> getSiblings();
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index af9048f1f5..6341510c2f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -17,8 +17,13 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.apache.qpid.dtx.XidImpl;
import org.apache.qpid.transport.DtxXaStatus;
@@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.RecoverResult;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.XaResult;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is an implementation of javax.njms.XAResource.
*/
-public class XAResourceImpl implements XAResource
+public class XAResourceImpl implements AMQXAResource
{
/**
* this XAResourceImpl's logger
@@ -57,9 +60,11 @@ public class XAResourceImpl implements XAResource
* The time for this resource
*/
private int _timeout;
-
+
//--- constructor
-
+
+ private List<XAResource> _siblings = new ArrayList<XAResource>();
+
/**
* Create an XAResource associated with a XASession
*
@@ -157,7 +162,21 @@ public class XAResourceImpl implements XAResource
_xaSession.createSession();
convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
+
checkStatus(result.getStatus());
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings");
+ }
+
+ for(XAResource sibling: _siblings)
+ {
+
+ sibling.end(xid, flag);
+ }
+
+ _siblings.clear();
}
@@ -216,28 +235,38 @@ public class XAResourceImpl implements XAResource
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
*/
public boolean isSameRM(XAResource xaResource) throws XAException
- {
+ {
if(this == xaResource)
{
- return true;
- }
- if(!(xaResource instanceof XAResourceImpl))
+ return true;
+ }
+
+ if(!(xaResource instanceof AMQXAResource))
{
- return false;
+ return false;
}
-
- XAResourceImpl other = (XAResourceImpl)xaResource;
- String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
- String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
-
+ String myUUID = getBrokerUUID();
+ String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID();
+
if(_logger.isDebugEnabled())
{
_logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
}
-
- return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
-
+
+ boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
+
+ if(isSameRm)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. ");
+ }
+ _siblings.add(xaResource);
+ }
+
+ return isSameRm;
+
}
/**
@@ -369,12 +398,12 @@ public class XAResourceImpl implements XAResource
{
_timeout = timeout;
if (timeout != _timeout && _xid != null)
- {
+ {
setDtxTimeout(_timeout);
}
return true;
}
-
+
private void setDtxTimeout(int timeout) throws XAException
{
_xaSession.getQpidSession()
@@ -437,18 +466,23 @@ public class XAResourceImpl implements XAResource
{
setDtxTimeout(_timeout);
}
+
+ for(XAResource sibling: _siblings)
+ {
+ sibling.start(xid, flag);
+ }
}
/**
* Is this resource currently enlisted in a transaction?
- *
+ *
* @return true if the resource is associated with a transaction, false otherwise.
*/
public boolean isEnlisted()
{
return (_xid != null) ;
}
-
+
//------------------------------------------------------------------------
// Private methods
//------------------------------------------------------------------------
@@ -517,7 +551,7 @@ public class XAResourceImpl implements XAResource
}
catch (XAException e)
{
- e.printStackTrace();
+ _logger.error(e.getMessage(), e);
throw e;
}
case ILLEGAL_STATE:
@@ -544,7 +578,7 @@ public class XAResourceImpl implements XAResource
* convert a generic xid into qpid format
* @param xid xid to be converted
* @return the qpid formated xid
- * @throws XAException when xid is null
+ * @throws XAException when xid is null
*/
private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
{
@@ -556,4 +590,13 @@ public class XAResourceImpl implements XAResource
return XidImpl.convert(xid);
}
+ public String getBrokerUUID()
+ {
+ return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
+ }
+
+ public List<XAResource> getSiblings()
+ {
+ return Collections.unmodifiableList(_siblings);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index f2efb6e8a5..05bd121bbd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.transport.RangeSet;
@@ -31,7 +32,7 @@ import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
/**
- * This is an implementation of the javax.njms.XASEssion interface.
+ * This is an implementation of the javax.jms.XASession interface.
*/
public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopicSession, XAQueueSession
{
@@ -67,7 +68,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
{
this(qpidConnection, con, channelId, false, ackMode, MessageFactoryRegistry.newDefaultRegistry(),
defaultPrefetchHigh, defaultPrefetchLow, null);
-
+
}
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
@@ -92,9 +93,6 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
_qpidDtxSession.dtxSelect();
}
-
- // javax.njms.XASEssion API
-
/**
* Gets the session associated with this XASession.
*
@@ -192,4 +190,11 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
super.acknowledgeImpl() ;
}
}
+
+ @Override
+ void resubscribe() throws AMQException
+ {
+ super.resubscribe();
+ createSession();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
index 3a3ddae52f..fa544f2d2e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.handler;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.handler;
* under the License.
*
*/
+package org.apache.qpid.client.handler;
import org.slf4j.Logger;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
index 919c5f6d67..46b1f08db3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.client.handler;
import org.slf4j.Logger;
@@ -8,26 +28,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ChannelFlowBody;
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
public class ChannelFlowMethodHandler implements StateAwareMethodListener<ChannelFlowBody>
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
index 11d99f5446..69ecd7bfba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
index f997862bb0..183ec4cb40 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index d1e43447cc..9e15b08f12 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -321,7 +321,40 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
protected abstract String getMimeType();
+ public String toHeaderString() throws JMSException
+ {
+ StringBuffer buf = new StringBuffer();
+ buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID());
+ buf.append("\nJMS timestamp: ").append(getJMSTimestamp());
+ buf.append("\nJMS expiration: ").append(getJMSExpiration());
+ buf.append("\nJMS priority: ").append(getJMSPriority());
+ buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
+ buf.append("\nJMS reply to: ").append(getReplyToString());
+ buf.append("\nJMS Redelivered: ").append(_redelivered);
+ buf.append("\nJMS Destination: ").append(getJMSDestination());
+ buf.append("\nJMS Type: ").append(getJMSType());
+ buf.append("\nJMS MessageID: ").append(getJMSMessageID());
+ buf.append("\nJMS Content-Type: ").append(getContentType());
+ buf.append("\nAMQ message number: ").append(getDeliveryTag());
+
+ buf.append("\nProperties:");
+ final Enumeration propertyNames = getPropertyNames();
+ if (!propertyNames.hasMoreElements())
+ {
+ buf.append("<NONE>");
+ }
+ else
+ {
+ buf.append('\n');
+ while(propertyNames.hasMoreElements())
+ {
+ String propertyName = (String) propertyNames.nextElement();
+ buf.append("\t").append(propertyName).append(" = ").append(getObjectProperty(propertyName)).append("\n");
+ }
+ }
+ return buf.toString();
+ }
public String toString()
{
@@ -330,35 +363,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
StringBuffer buf = new StringBuffer("Body:\n");
buf.append(toBodyString());
- buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID());
- buf.append("\nJMS timestamp: ").append(getJMSTimestamp());
- buf.append("\nJMS expiration: ").append(getJMSExpiration());
- buf.append("\nJMS priority: ").append(getJMSPriority());
- buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
- buf.append("\nJMS reply to: ").append(getReplyToString());
- buf.append("\nJMS Redelivered: ").append(_redelivered);
- buf.append("\nJMS Destination: ").append(getJMSDestination());
- buf.append("\nJMS Type: ").append(getJMSType());
- buf.append("\nJMS MessageID: ").append(getJMSMessageID());
- buf.append("\nJMS Content-Type: ").append(getContentType());
- buf.append("\nAMQ message number: ").append(getDeliveryTag());
-
- buf.append("\nProperties:");
- final Enumeration propertyNames = getPropertyNames();
- if (!propertyNames.hasMoreElements())
- {
- buf.append("<NONE>");
- }
- else
- {
- buf.append('\n');
- while(propertyNames.hasMoreElements())
- {
- String propertyName = (String) propertyNames.nextElement();
- buf.append("\t").append(propertyName).append(" = ").append(getObjectProperty(propertyName)).append("\n");
- }
-
- }
+ buf.append(toHeaderString());
return buf.toString();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
index bd63cdb5c5..c3f36a545a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
index 6e5f33a65c..e253e43a86 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import org.apache.qpid.framing.AMQShortString;
diff --git a/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClient.java b/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClient.java
new file mode 100644
index 0000000000..9965176772
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.security.crammd5hashed;
+
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler;
+
+/**
+ * A {@link CRAMMD5HashedSaslClient} merely wraps an instance of a CRAM-MD5 SASL client delegating
+ * all method calls to it, except {@link #getMechanismName()} which returns "CRAM-MD5-HASHED".
+ *
+ * This mechanism must be used with {@link UsernameHashedPasswordCallbackHandler} which is responsible
+ * for the additional hash of the password.
+ */
+public class CRAMMD5HashedSaslClient implements SaslClient
+{
+ private final SaslClient _cramMd5SaslClient;
+
+ public CRAMMD5HashedSaslClient(String authorizationId, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) throws SaslException
+ {
+ super();
+ String[] mechanisms = {"CRAM-MD5"};
+ _cramMd5SaslClient = Sasl.createSaslClient(mechanisms, authorizationId, protocol, serverName, props, cbh);
+ }
+
+ public void dispose() throws SaslException
+ {
+ _cramMd5SaslClient.dispose();
+ }
+
+ public String getMechanismName()
+ {
+ return CRAMMD5HashedSaslClientFactory.MECHANISM;
+ }
+
+ public byte[] evaluateChallenge(byte[] challenge) throws SaslException
+ {
+ return _cramMd5SaslClient.evaluateChallenge(challenge);
+ }
+
+
+ public Object getNegotiatedProperty(String propName)
+ {
+ return _cramMd5SaslClient.getNegotiatedProperty(propName);
+ }
+
+ public boolean hasInitialResponse()
+ {
+ return _cramMd5SaslClient.hasInitialResponse();
+ }
+
+ public boolean isComplete()
+ {
+ return _cramMd5SaslClient.isComplete();
+ }
+
+ public byte[] unwrap(byte[] incoming, int offset, int len)
+ throws SaslException
+ {
+ return _cramMd5SaslClient.unwrap(incoming, offset, len);
+ }
+
+ public byte[] wrap(byte[] outgoing, int offset, int len)
+ throws SaslException
+ {
+ return _cramMd5SaslClient.wrap(outgoing, offset, len);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java b/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java
index cb989f7919..b3ce1a0d23 100644
--- a/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java
@@ -44,14 +44,13 @@ public class CRAMMD5HashedSaslClientFactory implements SaslClientFactory
throw new SaslException("CallbackHandler must not be null");
}
- String[] mechs = {"CRAM-MD5"};
- return Sasl.createSaslClient(mechs, authorizationId, protocol, serverName, props, cbh);
+ return new CRAMMD5HashedSaslClient(authorizationId, protocol, serverName, props, cbh);
}
}
return null;
}
- public String[] getMechanismNames(Map props)
+ public String[] getMechanismNames(Map<String,?> props)
{
if (props != null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 7d028e022a..0b6217ffce 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -125,9 +125,9 @@ public class AMQStateManager implements AMQMethodListener
*/
public void setProtocolSession(AMQProtocolSession session)
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Setting ProtocolSession:" + session);
+ _logger.debug("Setting ProtocolSession:" + session);
}
_protocolSession = session;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index c8903d252f..fd2f003a56 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -61,7 +61,10 @@ public class StateWaiter extends BlockingWaiter<AMQState>
*/
public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates)
{
- _logger.info("New StateWaiter :" + currentState + ":" + awaitStates);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("New StateWaiter :" + currentState + ":" + awaitStates);
+ }
_stateManager = stateManager;
_awaitStates = awaitStates;
_startState = currentState;
diff --git a/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java b/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
index f303d155c6..516ee8cf37 100644
--- a/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
+++ b/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.url;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.url;
* under the License.
*
*/
+package org.apache.qpid.client.url;
import org.apache.qpid.client.AMQBrokerDetails;
diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
index f4d2ecc36d..0b4f0800d2 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
@@ -33,9 +33,7 @@ public class FailoverPolicy
{
private static final Logger _logger = LoggerFactory.getLogger(FailoverPolicy.class);
- private static final long MINUTE = 60000L;
-
- private static final long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE;
+ private final long DEFAULT_METHOD_TIMEOUT = Long.getLong("qpid.failover_method_timeout", 120000);
private FailoverMethod[] _methods = new FailoverMethod[1];
diff --git a/java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java b/java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java
index 1dbe464230..fe43ae8cd0 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/TopicSubscriber.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.jms;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.jms;
* under the License.
*
*/
+package org.apache.qpid.jms;
import org.apache.qpid.AMQException;
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 9b202a13ee..f17fb9b5f5 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -69,46 +69,21 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
public Context getInitialContext(Hashtable environment) throws NamingException
{
Map data = new ConcurrentHashMap();
-
+ BufferedInputStream inputStream = null;
try
{
- String file = null;
-
- if (environment.containsKey(Context.PROVIDER_URL))
- {
- file = (String) environment.get(Context.PROVIDER_URL);
- }
- else
- {
- file = System.getProperty(Context.PROVIDER_URL);
- }
+ String fileName = (environment.containsKey(Context.PROVIDER_URL))
+ ? (String)environment.get(Context.PROVIDER_URL) : System.getProperty(Context.PROVIDER_URL);
- // Load the properties specified
- if (file != null)
+ if (fileName != null)
{
- _logger.info("Loading Properties from:" + file);
- BufferedInputStream inputStream = null;
+ _logger.info("Attempting to load " + fileName);
- if(file.contains("file:"))
- {
- inputStream = new BufferedInputStream(new FileInputStream(new File(new URI(file))));
- }
- else
- {
- inputStream = new BufferedInputStream(new FileInputStream(file));
- }
-
+ inputStream = new BufferedInputStream(new FileInputStream((fileName.contains("file:"))
+ ? new File(new URI(fileName)) : new File(fileName)));
Properties p = new Properties();
-
- try
- {
- p.load(inputStream);
- }
- finally
- {
- inputStream.close();
- }
+ p.load(inputStream);
Strings.Resolver resolver = new Strings.ChainedResolver
(Strings.SYSTEM_RESOLVER, new Strings.PropertiesResolver(p));
@@ -134,12 +109,23 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
catch (IOException ioe)
{
_logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" +
- "Due to:"+ioe.getMessage());
+ "Due to:" + ioe.getMessage());
}
catch(URISyntaxException uoe)
{
_logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" +
- "Due to:"+uoe.getMessage());
+ "Due to:" + uoe.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ if(inputStream != null)
+ {
+ inputStream.close();
+ }
+ }
+ catch(Exception ignore){}
}
createConnectionFactories(data, environment);
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
index 7134f0a960..e8455f6dfa 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.nclient.util;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.nclient.util;
* under the License.
*
*/
+package org.apache.qpid.nclient.util;
import org.apache.qpid.api.Message;
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
index 9a2e9de3d9..a3fce7611f 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.nclient.util;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.nclient.util;
* under the License.
*
*/
+package org.apache.qpid.nclient.util;
import org.apache.qpid.nclient.MessagePartListener;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionFactoryTest.java
index 20496026ce..bb92fa4ecd 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQConnectionFactoryTest.java
@@ -18,7 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.test.unit.jndi;
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
import junit.framework.TestCase;
@@ -26,7 +28,7 @@ import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
-public class ConnectionFactoryTest extends TestCase
+public class AMQConnectionFactoryTest extends TestCase
{
//URL will be returned with the password field swapped for '********'
@@ -58,6 +60,20 @@ public class ConnectionFactoryTest extends TestCase
assertEquals("tcp", service.getTransport());
assertEquals("localhost", service.getHost());
assertEquals(5672, service.getPort());
+ }
+
+ public void testInstanceCreatedWithDefaultConstructorThrowsExceptionOnCallingConnectWithoutSettingURL() throws Exception
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory();
+ try
+ {
+ factory.createConnection();
+ fail("Expected exception not thrown");
+ }
+ catch(JMSException e)
+ {
+ assertEquals("Unexpected exception", AMQConnectionFactory.NO_URL_CONFIGURED, e.getMessage());
+ }
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java b/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
index 1fbd7cf212..c535fdd705 100644
--- a/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import junit.framework.TestCase;
diff --git a/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java b/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java
index 2989970dcd..ce9e681eaf 100644
--- a/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java
+++ b/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java
@@ -21,51 +21,47 @@
package org.apache.qpid.jndi;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.util.Properties;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.ConfigurationException;
import javax.naming.Context;
import javax.naming.InitialContext;
+import javax.naming.NamingException;
-import junit.framework.TestCase;
-
+import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class PropertiesFileInitialContextFactoryTest extends TestCase
+public class PropertiesFileInitialContextFactoryTest extends QpidTestCase
{
- private static final String FILE_URL_PATH = System.getProperty("user.dir") + "/client/src/test/java/org/apache/qpid/jndi/";
- private static final String FILE_NAME = "hello.properties";
-
- private Context ctx;
-
- protected void setUp() throws Exception
- {
- Properties properties = new Properties();
- properties.load(this.getClass().getResourceAsStream("JNDITest.properties"));
-
- //Create the initial context
- ctx = new InitialContext(properties);
- }
-
+ private static final String CONNECTION_URL = "amqp://username:password@clientid/test?brokerlist='tcp://testContextFromProviderURL:5672'";
public void testQueueNamesWithTrailingSpaces() throws Exception
{
+ Context ctx = prepareContext();
Queue queue = (Queue)ctx.lookup("QueueNameWithSpace");
assertEquals("QueueNameWithSpace",queue.getQueueName());
}
public void testTopicNamesWithTrailingSpaces() throws Exception
{
+ Context ctx = prepareContext();
Topic topic = (Topic)ctx.lookup("TopicNameWithSpace");
assertEquals("TopicNameWithSpace",topic.getTopicName());
}
public void testMultipleTopicNamesWithTrailingSpaces() throws Exception
{
+ Context ctx = prepareContext();
Topic topic = (Topic)ctx.lookup("MultipleTopicNamesWithSpace");
int i = 0;
for (AMQShortString bindingKey: ((AMQDestination)topic).getBindingKeys())
@@ -83,13 +79,59 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase
try
{
- ctx = new InitialContext(properties);
+ new InitialContext(properties);
fail("A configuration exception should be thrown with details about the address syntax error");
}
catch(ConfigurationException e)
{
assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}"));
}
+ }
+
+ private InitialContext prepareContext() throws IOException, NamingException
+ {
+ Properties properties = new Properties();
+ properties.load(this.getClass().getResourceAsStream("JNDITest.properties"));
+ return new InitialContext(properties);
+ }
+
+ /**
+ * Test loading of a JNDI properties file through use of a file:// URL
+ * supplied via the InitialContext.PROVIDER_URL system property.
+ */
+ public void testContextFromProviderURL() throws Exception
+ {
+ Properties properties = new Properties();
+ properties.put("connectionfactory.qpidConnectionfactory", CONNECTION_URL);
+ properties.put("destination.topicExchange", "destName");
+
+ File f = File.createTempFile(getTestName(), ".properties");
+ try
+ {
+ FileOutputStream fos = new FileOutputStream(f);
+ properties.store(fos, null);
+ fos.close();
+
+ setTestSystemProperty(ClientProperties.DEST_SYNTAX, "ADDR");
+ setTestSystemProperty(InitialContext.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ setTestSystemProperty(InitialContext.PROVIDER_URL, "file://" + f.getCanonicalPath());
+
+ InitialContext context = new InitialContext();
+ Destination dest = (Destination) context.lookup("topicExchange");
+ assertNotNull("Lookup from URI based context should not be null", dest);
+ assertTrue("Unexpected value from lookup", dest.toString().contains("destName"));
+
+ ConnectionFactory factory = (ConnectionFactory) context.lookup("qpidConnectionfactory");
+ assertTrue("ConnectionFactory was not an instance of AMQConnectionFactory", factory instanceof AMQConnectionFactory);
+ assertEquals("Unexpected ConnectionURL value", CONNECTION_URL.replaceAll("password", "********"),
+ ((AMQConnectionFactory)factory).getConnectionURLString());
+
+ context.close();
+ }
+ finally
+ {
+ f.delete();
+ }
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/jndi/hello.properties b/java/client/src/test/java/org/apache/qpid/jndi/hello.properties
deleted file mode 100644
index d017d137fe..0000000000
--- a/java/client/src/test/java/org/apache/qpid/jndi/hello.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
-
-# register some connection factories
-# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://10.0.1.46:5672'
-
-# Register an AMQP destination in JNDI
-# destination.[jniName] = [Address Format]
-destination.topicExchange = amq.topic