summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-06-21 17:06:57 +0000
committerAlex Rudyy <orudyy@apache.org>2013-06-21 17:06:57 +0000
commit4b461c11af414b57735df739d96a7d2a78385f99 (patch)
treea168a402c5f8af6104a6e0bf9be6390262572590 /java
parent5e17dc3e784dc15a9a3ce588317d0fe6ada2e52a (diff)
downloadqpid-python-4b461c11af414b57735df739d96a7d2a78385f99.tar.gz
QPID-4943: Introduce a feature for 0-8/0-9/0-9-1 protocols to close a connection on receiving a mandatory unroutable message in a transacted session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1495511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Broker.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java147
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java7
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java74
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java13
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java77
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQException.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java9
-rw-r--r--java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java26
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java9
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java34
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java223
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java93
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java178
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java5
-rwxr-xr-xjava/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java22
-rwxr-xr-xjava/test-profiles/Java010Excludes4
24 files changed, 948 insertions, 112 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8d88ee902a..8588aea2d4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -324,14 +325,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
if(destinationQueues == null || destinationQueues.isEmpty())
{
- if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
- {
- _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage));
- }
- else
- {
- _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
- }
+ handleUnroutableMessage();
}
else
{
@@ -378,6 +372,61 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
+ /**
+ * Either throws a {@link AMQConnectionException} or returns the message
+ *
+ * Pre-requisite: the current message is judged to have no destination queues.
+ *
+ * @throws AMQConnectionException if the message is mandatoryclose-on-no-route
+ * @see AMQProtocolSession#isCloseWhenNoRoute()
+ */
+ private void handleUnroutableMessage() throws AMQConnectionException
+ {
+ boolean mandatory = _currentMessage.isMandatory();
+ String description = currentMessageDescription();
+ boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(String.format(
+ "Unroutable message %s, mandatory=%s, transactionalSession=%s, closeOnNoRoute=%s",
+ description, mandatory, isTransactional(), closeOnNoRoute));
+ }
+
+ if (mandatory && isTransactional() && _session.isCloseWhenNoRoute())
+ {
+ throw new AMQConnectionException(
+ AMQConstant.NO_ROUTE,
+ "No route for message " + currentMessageDescription(),
+ 0, 0, // default class and method ids
+ getProtocolSession().getProtocolVersion().getMajorVersion(),
+ getProtocolSession().getProtocolVersion().getMinorVersion(),
+ (Throwable) null);
+ }
+
+ if (mandatory || _currentMessage.isImmediate())
+ {
+ _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), _currentMessage));
+ }
+ else
+ {
+ _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
+ }
+ }
+
+ private String currentMessageDescription()
+ {
+ if(_currentMessage == null || !_currentMessage.allContentReceived())
+ {
+ throw new IllegalStateException("Cannot create message description for message: " + _currentMessage);
+ }
+
+ return String.format(
+ "[Exchange: %s, Routing key: %s]",
+ _currentMessage.getExchange(),
+ _currentMessage.getRoutingKey());
+ }
+
public void publishContentBody(ContentBody contentBody) throws AMQException
{
if (_currentMessage == null)
@@ -522,6 +571,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*
* @throws AMQException if there is an error during closure
*/
+ @Override
public void close() throws AMQException
{
if(!_closing.compareAndSet(false, true))
@@ -1344,7 +1394,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_message,
_channelId,
_errorCode.getCode(),
- new AMQShortString(_description));
+ AMQShortString.valueOf(_description, true, true));
}
catch (AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
index 24fd687240..8f565362b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -73,6 +73,7 @@ public interface Broker extends ConfiguredObject
String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit";
String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
+ String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
String VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD = "virtualhost.housekeepingCheckPeriod";
String VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "virtualhost.storeTransactionIdleTimeoutClose";
@@ -113,6 +114,7 @@ public interface Broker extends ConfiguredObject
VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD,
CONNECTION_SESSION_COUNT_LIMIT,
CONNECTION_HEART_BEAT_DELAY,
+ CONNECTION_CLOSE_WHEN_NO_ROUTE,
STATISTICS_REPORTING_PERIOD,
STATISTICS_REPORTING_RESET_ENABLED,
STORE_TYPE,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 678db43d58..ff9cac9a21 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -92,6 +92,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
put(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
put(CONNECTION_SESSION_COUNT_LIMIT, Integer.class);
put(CONNECTION_HEART_BEAT_DELAY, Integer.class);
+ put(CONNECTION_CLOSE_WHEN_NO_ROUTE, Boolean.class);
put(STATISTICS_REPORTING_PERIOD, Integer.class);
put(NAME, String.class);
@@ -124,6 +125,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 0l;
public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l;
public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 0l;
+ public static final boolean DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE = true;
@SuppressWarnings("serial")
private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{
@@ -141,6 +143,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD);
put(Broker.CONNECTION_HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY);
put(Broker.CONNECTION_SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT);
+ put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE);
put(Broker.NAME, DEFAULT_NAME);
put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE);
put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index bbf90fad86..92d6683415 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -27,8 +27,10 @@ import java.nio.ByteBuffer;
import java.security.Principal;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -39,6 +41,7 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
@@ -47,7 +50,24 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -102,6 +122,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ /**
+ * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
+ * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
+ * on after handling the frames.
+ *
+ * Thread-safety: guarded by {@link #_receivedLock}.
+ */
+ private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>();
+
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
private final AMQStateManager _stateManager;
@@ -160,6 +189,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Broker _broker;
private final Transport _transport;
+ private volatile boolean _closeWhenNoRoute;
public AMQProtocolEngine(Broker broker,
NetworkConnection network,
@@ -184,6 +214,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false));
+ _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE);
+
initialiseStatistics();
}
@@ -253,17 +285,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
dataBlockReceived(dataBlock);
}
+ catch(AMQConnectionException e)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e);
+ }
+ break;
+ }
catch (Exception e)
{
_logger.error("Unexpected exception when processing datablock", e);
closeProtocolSession();
+ break;
}
}
- receiveComplete();
+ receivedComplete();
}
catch (Exception e)
{
- _logger.error("Unexpected exception when processing datablock", e);
+ _logger.error("Unexpected exception when processing datablocks", e);
closeProtocolSession();
}
finally
@@ -272,16 +313,45 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- private void receiveComplete()
+ private void receivedComplete() throws AMQException
{
- for (AMQChannel channel : _channelMap.values())
+ Exception exception = null;
+ for (AMQChannel channel : _channelsForCurrentMessage)
{
- channel.receivedComplete();
+ try
+ {
+ channel.receivedComplete();
+ }
+ catch(Exception exceptionForThisChannel)
+ {
+ if(exception == null)
+ {
+ exception = exceptionForThisChannel;
+ }
+ _logger.error("Error informing channel that receiving is complete. Channel: " + channel, exceptionForThisChannel);
+ }
}
+ _channelsForCurrentMessage.clear();
+
+ if(exception != null)
+ {
+ throw new AMQException(
+ AMQConstant.INTERNAL_ERROR,
+ "Error informing channel that receiving is complete: " + exception.getMessage(),
+ exception);
+ }
}
- public void dataBlockReceived(AMQDataBlock message) throws Exception
+ /**
+ * Process the data block.
+ * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}.
+ *
+ * @throws an AMQConnectionException if unable to process the data block. In this case,
+ * the connection is already closed by the time the exception is thrown. If any other
+ * type of exception is thrown, the connection is not already closed.
+ */
+ private void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
if (message instanceof ProtocolInitiation)
@@ -301,18 +371,40 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
+ /**
+ * Handle the supplied frame.
+ * Adds this frame's channel to {@link #_channelsForCurrentMessage}.
+ *
+ * @throws an AMQConnectionException if unable to process the data block. In this case,
+ * the connection is already closed by the time the exception is thrown. If any other
+ * type of exception is thrown, the connection is not already closed.
+ */
private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
+ AMQChannel amqChannel = _channelMap.get(channelId);
+ if(amqChannel != null)
+ {
+ // The _receivedLock is already aquired in the caller
+ // It is safe to add channel
+ _channelsForCurrentMessage.add(amqChannel);
+ }
+ else
+ {
+ // Not an error. The frame is probably a channel Open for this channel id, which
+ // does not require asynchronous work therefore its absence from
+ // _channelsForCurrentMessage is ok.
+ }
+
AMQBody body = frame.getBodyFrame();
//Look up the Channel's Actor and set that as the current actor
// If that is not available then we can use the ConnectionActor
// that is associated with this AMQMPSession.
LogActor channelActor = null;
- if (_channelMap.get(channelId) != null)
+ if (amqChannel != null)
{
- channelActor = _channelMap.get(channelId).getLogActor();
+ channelActor = amqChannel.getLogActor();
}
CurrentActor.set(channelActor == null ? _actor : channelActor);
@@ -349,6 +441,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
body.handle(channelId, this);
}
+ catch(AMQConnectionException e)
+ {
+ _logger.info(e.getMessage() + " whilst processing frame: " + body);
+ closeConnection(channelId, e);
+ throw e;
+ }
catch (AMQException e)
{
closeChannel(channelId);
@@ -400,6 +498,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
QpidProperties.getBuildVersion());
serverProperties.setString(ServerPropertyNames.QPID_INSTANCE_NAME,
_broker.getName());
+ serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE,
+ String.valueOf(_closeWhenNoRoute));
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) pv.getActualMinorVersion(),
@@ -720,6 +820,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
+ @Override
public void closeChannel(int channelId) throws AMQException
{
final AMQChannel channel = getChannel(channelId);
@@ -819,12 +920,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
+ @Override
public void closeSession() throws AMQException
{
if(_closing.compareAndSet(false,true))
{
// force sync of outstanding async work
- receiveComplete();
+ _receivedLock.lock();
+ try
+ {
+ receivedComplete();
+ }
+ finally
+ {
+ _receivedLock.unlock();
+ }
// REMOVE THIS SHOULD NOT BE HERE.
if (CurrentActor.get() == null)
@@ -900,6 +1010,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
+ @Override
public void closeProtocolSession()
{
_network.close();
@@ -968,6 +1079,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_clientProperties = clientProperties;
if (_clientProperties != null)
{
+ Boolean closeWhenNoRoute = _clientProperties.getBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+ if (closeWhenNoRoute != null)
+ {
+ _closeWhenNoRoute = closeWhenNoRoute;
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this);
+ }
+ }
+
_clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
if (_clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8) != null)
@@ -1538,4 +1659,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
return _lastWriteTime.get();
}
+
+ @Override
+ public boolean isCloseWhenNoRoute()
+ {
+ return _closeWhenNoRoute;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index ba806c04bd..1842117d6f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -218,4 +218,11 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
public Principal getPeerPrincipal();
Lock getReceivedLock();
+
+ /**
+ * Used for 0-8/0-9/0-9-1 connections to choose to close
+ * the connection when a transactional session receives a 'mandatory' message which
+ * can't be routed rather than returning the message.
+ */
+ boolean isCloseWhenNoRoute();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 8de19d9cff..1c8939d117 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -73,6 +73,7 @@ public class BrokerTestHelper
when(subjectCreator.getMechanisms()).thenReturn("");
Broker broker = mock(Broker.class);
when(broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT)).thenReturn(1);
+ when(broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(false);
when(broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(10000l);
when(broker.getId()).thenReturn(UUID.randomUUID());
when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 4e885258b9..74c9878a8e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -844,7 +844,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- public void close() throws JMSException
+ public void close() throws JMSException
{
close(DEFAULT_TIMEOUT);
}
@@ -859,9 +859,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (!setClosed())
{
setClosing(true);
- try{
+ try
+ {
doClose(sessions, timeout);
- }finally{
+ }
+ finally
+ {
setClosing(false);
}
}
@@ -1594,4 +1597,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _validateQueueOnSend;
}
+
+ @Override
+ protected boolean setClosed()
+ {
+ return super.setClosed();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java b/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
new file mode 100644
index 0000000000..baae072167
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.properties.ConnectionStartProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used during connection establishment to optionally set the "close when no route" client property
+ */
+class CloseWhenNoRouteSettingsHelper
+{
+ private static final Logger _log = LoggerFactory.getLogger(CloseWhenNoRouteSettingsHelper.class);
+
+ /**
+ * @param url the client's connection URL which may contain the option
+ * {@value ConnectionStartProperties#QPID_CLOSE_WHEN_NO_ROUTE}
+ * @param serverProperties the properties received from the broker which may contain the option
+ * {@value ConnectionStartProperties#QPID_CLOSE_WHEN_NO_ROUTE}
+ * @param clientProperties the client properties to optionally set the close-when-no-route option on
+ */
+ public void setClientProperties(FieldTable clientProperties, ConnectionURL url, FieldTable serverProperties)
+ {
+ boolean brokerSupportsCloseWhenNoRoute =
+ serverProperties != null && serverProperties.containsKey(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+ boolean brokerCloseWhenNoRoute = brokerSupportsCloseWhenNoRoute &&
+ Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE));
+
+ String closeWhenNoRouteOption = url.getOption(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE);
+ if(closeWhenNoRouteOption != null)
+ {
+ if(brokerSupportsCloseWhenNoRoute)
+ {
+ boolean desiredCloseWhenNoRoute = Boolean.valueOf(closeWhenNoRouteOption);
+ if(desiredCloseWhenNoRoute != brokerCloseWhenNoRoute)
+ {
+ clientProperties.setBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute);
+ _log.debug(
+ "Set client property {} to {}",
+ ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute);
+ }
+ else
+ {
+ _log.debug(
+ "Client's desired {} value {} already matches the server's",
+ ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute);
+ }
+ }
+ else
+ {
+ _log.warn("The broker being connected to does not support the " + ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE + " option");
+ }
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 66c4821f60..366b5f115e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -37,6 +37,7 @@ import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.properties.ConnectionStartProperties;
import javax.security.sasl.Sasl;
@@ -51,6 +52,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler();
+ private final CloseWhenNoRouteSettingsHelper _closeWhenNoRouteHelper = new CloseWhenNoRouteSettingsHelper();
+
public static ConnectionStartMethodHandler getInstance()
{
return _instance;
@@ -59,6 +62,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
private ConnectionStartMethodHandler()
{ }
+ @Override
public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId)
throws AMQException
{
@@ -147,6 +151,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
}
session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED);
+
FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.setString(ConnectionStartProperties.CLIENT_ID_0_8,
@@ -162,12 +167,16 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
clientProperties.setInteger(ConnectionStartProperties.PID,
ConnectionStartProperties.getPID());
+ FieldTable serverProperties = body.getServerProperties();
+ ConnectionURL url = getConnectionURL(session);
+ _closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties);
+
ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales));
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
session.writeFrame(connectionStartOkBody.generateFrame(channelId));
-
+
}
catch (UnsupportedEncodingException e)
{
@@ -195,7 +204,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
try
{
AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism);
- instance.initialise(protocolSession.getAMQConnection().getConnectionURL());
+ instance.initialise(getConnectionURL(protocolSession));
return instance;
}
@@ -205,4 +214,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
}
}
+ private ConnectionURL getConnectionURL(AMQProtocolSession protocolSession)
+ {
+ return protocolSession.getAMQConnection().getConnectionURL();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index c4fbeb5607..3050e84419 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -57,6 +57,19 @@ public interface ConnectionURL
* the DLQ (or dropped) when delivery count exceeds the maximum.
*/
public static final String OPTIONS_REJECT_BEHAVIOUR = "rejectbehaviour";
+
+ /**
+ * <p>
+ * This option is only applicable for 0-8/0-9/0-9-1 protocol connections.
+ * </p>
+ * <p>
+ * It tells the client to request whether the broker should close the
+ * connection when a mandatory message isn't routable, rather than return
+ * the message to the client as it normally would.
+ * </p>
+ */
+ public static final String OPTIONS_CLOSE_WHEN_NO_ROUTE = "closeWhenNoRoute";
+
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
diff --git a/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java b/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java
new file mode 100644
index 0000000000..f1d7a76c75
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.handler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class CloseWhenNoRouteSettingsHelperTest extends QpidTestCase
+{
+ private static final String FALSE_STR = Boolean.toString(false);
+ private static final String TRUE_STR = Boolean.toString(true);
+
+ private final CloseWhenNoRouteSettingsHelper _closeWhenNoRouteSettingsHelper = new CloseWhenNoRouteSettingsHelper();
+
+ public void testCloseWhenNoRouteNegotiation()
+ {
+ test("Nothing should be set if option not in URL",
+ null,
+ true,
+ null);
+ test("Client should disable broker's enabled option",
+ FALSE_STR,
+ true,
+ false);
+ test("Client should be able to disable broker's enabled option",
+ TRUE_STR,
+ false,
+ true);
+ test("Client should not enable option if unsupported by broker",
+ TRUE_STR,
+ null,
+ null);
+ test("Malformed client option should evaluate to false",
+ "malformed boolean",
+ true,
+ false);
+ }
+
+ private void test(String message, String urlOption, Boolean serverOption, Boolean expectedClientProperty)
+ {
+ ConnectionURL url = mock(ConnectionURL.class);
+ when(url.getOption(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE)).thenReturn(urlOption);
+
+ FieldTable serverProperties = new FieldTable();
+ if(serverOption != null)
+ {
+ serverProperties.setBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, serverOption);
+ }
+
+ FieldTable clientProperties = new FieldTable();
+
+ _closeWhenNoRouteSettingsHelper.setClientProperties(clientProperties, url, serverProperties);
+
+ assertEquals(message, expectedClientProperty, clientProperties.getBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE));
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java
index 2d54e35191..40ecc3a946 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQException.java
@@ -149,10 +149,6 @@ public class AMQException extends Exception
public AMQShortString getMessageAsShortString()
{
String message = getMessage();
- if (message != null && message.length() > AMQShortString.MAX_LENGTH)
- {
- message = message.substring(0, AMQShortString.MAX_LENGTH - 3) + "...";
- }
- return new AMQShortString(message);
+ return AMQShortString.valueOf(message, true, true);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index b577c916c6..4adc59b158 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -794,9 +794,30 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return false; //To change body of created methods use File | Settings | File Templates.
}
+ public static AMQShortString valueOf(Object obj, boolean truncate, boolean nullAsEmptyString)
+ {
+ if (obj == null)
+ {
+ if (nullAsEmptyString)
+ {
+ return EMPTY_STRING;
+ }
+ return null;
+ }
+ else
+ {
+ String value = String.valueOf(obj);
+ if (truncate && value.length() > AMQShortString.MAX_LENGTH)
+ {
+ value = value.substring(0, AMQShortString.MAX_LENGTH - 3) + "...";
+ }
+ return valueOf(value);
+ }
+ }
+
public static AMQShortString valueOf(Object obj)
{
- return obj == null ? null : AMQShortString.valueOf(String.valueOf(obj));
+ return valueOf(obj, false, false);
}
public static AMQShortString valueOf(String obj)
diff --git a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
index 59a1b6c5b0..6f9d872f98 100644
--- a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
@@ -26,13 +26,20 @@ import java.lang.management.RuntimeMXBean;
import org.apache.qpid.transport.util.Logger;
/**
- * Constants for the various properties 0-10 clients can
+ * Constants for the various properties clients can
* set values for during the ConnectionStartOk reply.
*/
public class ConnectionStartProperties
{
private static final Logger LOGGER = Logger.get(ConnectionStartProperties.class);
+ /**
+ * Used for 0-8/0-9/0-9-1 connections to choose to close
+ * the connection when a transactional session receives a 'mandatory' message which
+ * can't be routed rather than returning the message.
+ */
+ public static final String QPID_CLOSE_WHEN_NO_ROUTE = "qpid.close_when_no_route";
+
public static final String CLIENT_ID_0_10 = "clientName";
public static final String CLIENT_ID_0_8 = "instance";
diff --git a/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java
index 0f8fbf0685..61ac04213e 100644
--- a/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java
+++ b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java
@@ -300,6 +300,32 @@ public class AMQShortStringTest extends TestCase
assertEquals("join result differs from expected", expected.toString(), result.asString());
}
+ public void testValueOf()
+ {
+ String string = buildString('a', 255);
+ AMQShortString shortString = AMQShortString.valueOf(string, true, true);
+ assertEquals("Unexpected string from valueOf", string, shortString.asString());
+ }
+
+ public void testValueOfTruncated()
+ {
+ String string = buildString('a', 256);
+ AMQShortString shortString = AMQShortString.valueOf(string, true, true);
+ assertEquals("Unexpected truncated string from valueOf", string.substring(0, AMQShortString.MAX_LENGTH -3) + "...", shortString.asString());
+ }
+
+ public void testValueOfNulAsEmptyString()
+ {
+ AMQShortString shortString = AMQShortString.valueOf(null, true, true);
+ assertEquals("Unexpected empty string from valueOf", AMQShortString.EMPTY_STRING, shortString);
+ }
+
+ public void testValueOfNullAsNull()
+ {
+ AMQShortString shortString = AMQShortString.valueOf(null, true, false);
+ assertEquals("Unexpected null string from valueOf", null, shortString);
+ }
+
/**
* A helper method to generate a string with given length containing given
* character
diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
index 8324ac74a5..00c85e80c8 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
@@ -19,6 +19,8 @@
package org.apache.qpid.server.security.acl;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
@@ -306,7 +308,11 @@ public class ExternalACLTest extends AbstractACLTestCase
conn.start();
- MessageProducer sender = sess.createProducer(sess.createQueue("example.RequestQueue"));
+ Queue queue = sess.createQueue("example.RequestQueue");
+
+ ((AMQSession<?,?>)sess).declareAndBind((AMQDestination)queue);
+
+ MessageProducer sender = sess.createProducer(queue);
sender.send(sess.createTextMessage("test"));
@@ -316,7 +322,6 @@ public class ExternalACLTest extends AbstractACLTestCase
conn.close();
}
-
public void setUpRequestResponseSuccess() throws Exception
{
// The group "messaging-users", referenced in the ACL below, is currently defined
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
index 99ee99e8c5..cc662bddca 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
@@ -14,6 +14,7 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -40,6 +41,9 @@ public class ExchangeManagementTest extends QpidBrokerTestCase
{
getBrokerConfiguration().addJmxManagementConfiguration();
+ // to test exchange selectors the publishing of unroutable messages should be allowed
+ getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
+
_jmxUtils = new JMXTestUtils(this);
super.setUp();
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
index 0d40eca745..1d2f6e3427 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
@@ -27,6 +27,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
import org.apache.qpid.server.model.Broker;
@@ -35,7 +40,7 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.client.UnroutableMessageTestExceptionListener;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
public class BrokerRestTest extends QpidRestTestCase
@@ -160,6 +165,33 @@ public class BrokerRestTest extends QpidRestTestCase
assertEquals("Unexpected update response for flow resume size > flow size", 409, response);
}
+ public void testSetCloseOnNoRoute() throws Exception
+ {
+ Map<String, Object> brokerDetails = getRestTestHelper().getJsonAsSingletonList("/rest/broker");
+ assertTrue("closeOnNoRoute should be true", (Boolean)brokerDetails.get(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE));
+
+ Map<String, Object> brokerAttributes = new HashMap<String, Object>();
+ brokerAttributes.put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
+
+ int response = getRestTestHelper().submitRequest("/rest/broker", "PUT", brokerAttributes);
+ assertEquals("Unexpected update response", 200, response);
+
+ brokerDetails = getRestTestHelper().getJsonAsSingletonList("/rest/broker");
+ assertFalse("closeOnNoRoute should be false", (Boolean)brokerDetails.get(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE));
+
+ Connection connection = getConnection();
+ UnroutableMessageTestExceptionListener exceptionListener = new UnroutableMessageTestExceptionListener();
+ connection.setExceptionListener(exceptionListener);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(getTestQueue());
+ TextMessage message = session.createTextMessage("Test");
+ producer.send(message);
+
+ session.commit();
+
+ exceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+
private Map<String, Object> getValidBrokerAttributes()
{
Map<String, Object> brokerAttributes = new HashMap<String, Object>();
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java
new file mode 100644
index 0000000000..a8a72c20fc
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.test.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Tests the broker's connection-closing behaviour when it receives an unroutable message
+ * on a transactional session.
+ *
+ * @see ImmediateAndMandatoryPublishingTest for more general tests of mandatory and immediate publishing
+ */
+public class CloseOnNoRouteForMandatoryMessageTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener();
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void testNoRoute_brokerClosesConnection() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(true);
+
+ Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ try
+ {
+ transactedSession.commit();
+ fail("Expected exception not thrown");
+ }
+ catch(JMSException e)
+ {
+ _testExceptionListener.assertNoRoute(e, testQueueName);
+ }
+ _testExceptionListener.assertReceivedNoRoute(testQueueName);
+
+ forgetConnection(_connection);
+ }
+
+ public void testCloseOnNoRouteWhenExceptionMessageLengthIsGreater255() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(true);
+
+ AMQSession<?, ?> transactedSession = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ StringBuilder longExchangeName = getLongExchangeName();
+
+ AMQShortString exchangeName = new AMQShortString(longExchangeName.toString());
+ transactedSession.declareExchange(exchangeName, new AMQShortString("direct"), false);
+
+ Destination testQueue = new AMQQueue(exchangeName, getTestQueueName());
+ MessageProducer mandatoryProducer = transactedSession.createProducer(
+ testQueue,
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ try
+ {
+ transactedSession.commit();
+ fail("Expected exception not thrown");
+ }
+ catch (JMSException e)
+ {
+ AMQException noRouteException = (AMQException) e.getLinkedException();
+ assertNotNull("AMQException should be linked to JMSException", noRouteException);
+
+ assertEquals(AMQConstant.NO_ROUTE, noRouteException.getErrorCode());
+ String expectedMessage = "Error: No route for message [Exchange: " + longExchangeName.substring(0, 220) + "...";
+ assertEquals("Unexpected exception message: " + noRouteException.getMessage(), expectedMessage,
+ noRouteException.getMessage());
+ }
+ finally
+ {
+ forgetConnection(_connection);
+ }
+ }
+
+ public void testNoRouteMessageReurnedWhenExceptionMessageLengthIsGreater255() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(false);
+
+ AMQSession<?, ?> transactedSession = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ StringBuilder longExchangeName = getLongExchangeName();
+
+ AMQShortString exchangeName = new AMQShortString(longExchangeName.toString());
+ transactedSession.declareExchange(exchangeName, new AMQShortString("direct"), false);
+
+ AMQQueue testQueue = new AMQQueue(exchangeName, getTestQueueName());
+ MessageProducer mandatoryProducer = transactedSession.createProducer(
+ testQueue,
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ transactedSession.commit();
+ _testExceptionListener.assertReceivedReturnedMessageWithLongExceptionMessage(message, testQueue);
+ }
+
+ private StringBuilder getLongExchangeName()
+ {
+ StringBuilder longExchangeName = new StringBuilder();
+ for (int i = 0; i < 50; i++)
+ {
+ longExchangeName.append("abcde");
+ }
+ return longExchangeName;
+ }
+
+ public void testNoRouteForNonMandatoryMessage_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(true);
+
+ Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer nonMandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ false, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ nonMandatoryProducer.send(message);
+
+ // should succeed - the message is simply discarded
+ transactedSession.commit();
+
+ _testExceptionListener.assertNoException();
+ }
+
+
+ public void testNoRouteOnNonTransactionalSession_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(true);
+
+ Session nonTransactedSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) nonTransactedSession).createProducer(
+ nonTransactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = nonTransactedSession.createMessage();
+ mandatoryProducer.send(message);
+
+ // should succeed - the message is asynchronously bounced back to the exception listener
+ message.acknowledge();
+
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+
+ public void testClientDisablesCloseOnNoRoute_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ createConnectionWithCloseWhenNoRoute(false);
+
+ Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ transactedSession.commit();
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+
+ private void createConnectionWithCloseWhenNoRoute(boolean closeWhenNoRoute) throws URLSyntaxException, NamingException, JMSException
+ {
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(closeWhenNoRoute));
+ _connection = getConnectionWithOptions(options);
+ _connection.setExceptionListener(_testExceptionListener);
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
index b746a5b09e..d012b9abbb 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java
@@ -18,34 +18,34 @@
*/
package org.apache.qpid.test.client;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
+
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase implements ExceptionListener
+/**
+ * @see CloseOnNoRouteForMandatoryMessageTest for related tests
+ */
+public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase
{
private Connection _connection;
- private BlockingQueue<JMSException> _exceptions;
+ private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener();
+ @Override
public void setUp() throws Exception
{
+ getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
super.setUp();
- _exceptions = new ArrayBlockingQueue<JMSException>(1);
_connection = getConnection();
- _connection.setExceptionListener(this);
+ _connection.setExceptionListener(_testExceptionListener);
}
public void testPublishP2PWithNoConsumerAndImmediateOnAndAutoAck() throws Exception
@@ -103,14 +103,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
consumerCreateAndClose(true, false);
Message message = produceMessage(Session.AUTO_ACKNOWLEDGE, true, false, true);
-
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
}
private void publishIntoExistingDestinationWithNoConsumerAndImmediateOn(int acknowledgeMode, boolean pubSub)
@@ -120,27 +113,14 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
Message message = produceMessage(acknowledgeMode, pubSub, false, true);
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoConsumersException noConsumerException = (AMQNoConsumersException) exception.getLinkedException();
- assertNotNull("AMQNoConsumersException should be linked to JMSEXception", noConsumerException);
- Message bounceMessage = (Message) noConsumerException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoConsumersWithReturnedMessage(message);
}
private void publishWithMandatoryOnImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException,
InterruptedException
{
Message message = produceMessage(acknowledgeMode, pubSub, true, false);
-
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
}
private void publishWithMandatoryOffImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException,
@@ -148,8 +128,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
{
produceMessage(acknowledgeMode, pubSub, false, false);
- JMSException exception = _exceptions.poll(1, TimeUnit.SECONDS);
- assertNull("Unexpected JMSException", exception);
+ _testExceptionListener.assertNoException();
}
private void consumerCreateAndClose(boolean pubSub, boolean durable) throws JMSException
@@ -210,41 +189,26 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
Message message = session.createMessage();
producer.send(message);
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
producer = session.createProducer(null);
message = session.createMessage();
producer.send(session.createQueue(getTestQueueName()), message);
- exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
-
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
// publish to non-existent topic - should get no failure
producer = session.createProducer(session.createTopic(getTestQueueName()));
message = session.createMessage();
producer.send(message);
- exception = _exceptions.poll(1, TimeUnit.SECONDS);
- assertNull("Unexpected JMSException", exception);
+ _testExceptionListener.assertNoException();
producer = session.createProducer(null);
message = session.createMessage();
producer.send(session.createTopic(getTestQueueName()), message);
- exception = _exceptions.poll(1, TimeUnit.SECONDS);
- assertNull("Unexpected JMSException", exception);
+ _testExceptionListener.assertNoException();
session.close();
}
@@ -260,13 +224,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
Message message = session.createMessage();
producer.send(message);
- JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS);
- assertNotNull("JMSException is expected", exception);
- AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
- assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException);
- Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
- assertNotNull("Bounced Message is expected", bounceMessage);
- assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
// now set topic specific system property to false - should no longer get mandatory failure on new producer
setTestClientSystemProperty("qpid.default_mandatory_topic","false");
@@ -274,17 +232,6 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl
message = session.createMessage();
producer.send(session.createTopic(getTestQueueName()), message);
- exception = _exceptions.poll(1, TimeUnit.SECONDS);
- if(exception != null)
- {
- exception.printStackTrace();
- }
- assertNull("Unexpected JMSException", exception);
-
- }
-
- public void onException(JMSException exception)
- {
- _exceptions.add(exception);
+ _testExceptionListener.assertNoException();
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java b/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java
new file mode 100644
index 0000000000..7ba0bd17f3
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.test.client;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * Provides utility methods for checking exceptions that are thrown on the client side when a message is
+ * not routable.
+ *
+ * Exception objects are passed either explicitly as method parameters or implicitly
+ * by previously doing {@link Connection#setExceptionListener(ExceptionListener)}.
+ */
+public class UnroutableMessageTestExceptionListener implements ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(UnroutableMessageTestExceptionListener.class);
+
+ /**
+ * Number of seconds to check for an event that should should NOT happen
+ */
+ private static final int NEGATIVE_TIMEOUT = 2;
+
+ /**
+ * Number of seconds to keep checking for an event that should should happen
+ */
+ private static final int POSITIVE_TIMEOUT = 30;
+
+ private BlockingQueue<JMSException> _exceptions = new ArrayBlockingQueue<JMSException>(1);
+
+ @Override
+ public void onException(JMSException e)
+ {
+ _logger.info("Received exception " + e);
+ _exceptions.add(e);
+ }
+
+ public void assertReceivedNoRouteWithReturnedMessage(Message message, String intendedQueueName)
+ {
+ JMSException exception = getReceivedException();
+ assertNoRouteExceptionWithReturnedMessage(exception, message, intendedQueueName);
+ }
+
+ public void assertReceivedNoRoute(String intendedQueueName)
+ {
+ JMSException exception = getReceivedException();
+ assertNoRoute(exception, intendedQueueName);
+ }
+
+ public void assertReceivedNoConsumersWithReturnedMessage(Message message)
+ {
+ JMSException exception = getReceivedException();
+ AMQNoConsumersException noConsumersException = (AMQNoConsumersException) exception.getLinkedException();
+ assertNotNull("AMQNoConsumersException should be linked to JMSException", noConsumersException);
+ Message bounceMessage = (Message) noConsumersException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+
+ try
+ {
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("Couldn't check exception", e);
+ }
+ }
+
+ public void assertReceivedReturnedMessageWithLongExceptionMessage(Message message, AMQQueue queue)
+ {
+ JMSException exception = getReceivedException();
+ assertNoRouteException(exception, message);
+ AMQShortString exchangeName = queue.getExchangeName();
+ String expectedMessage = "Error: No Route for message [Exchange: " + exchangeName.asString().substring(0, 220) + "...";
+ assertTrue("Unexpected exception message: " + exception.getMessage(), exception.getMessage().contains(expectedMessage));
+ }
+
+ public void assertNoRouteExceptionWithReturnedMessage(
+ JMSException exception, Message message, String intendedQueueName)
+ {
+ assertNoRoute(exception, intendedQueueName);
+
+ assertNoRouteException(exception, message);
+ }
+
+ private void assertNoRouteException(JMSException exception, Message message)
+ {
+ AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
+ assertNotNull("AMQNoRouteException should be linked to JMSException", noRouteException);
+ Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+
+ try
+ {
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("Couldn't check exception", e);
+ }
+ }
+
+ public void assertNoRoute(JMSException exception, String intendedQueueName)
+ {
+ assertTrue(
+ exception + " message should contain intended queue name",
+ exception.getMessage().contains(intendedQueueName));
+
+ AMQException noRouteException = (AMQException) exception.getLinkedException();
+ assertNotNull("AMQException should be linked to JMSException", noRouteException);
+
+ assertEquals(AMQConstant.NO_ROUTE, noRouteException.getErrorCode());
+ assertTrue(
+ "Linked exception " + noRouteException + " message should contain intended queue name",
+ noRouteException.getMessage().contains(intendedQueueName));
+ }
+
+
+ public void assertNoException()
+ {
+ try
+ {
+ assertNull("Unexpected JMSException", _exceptions.poll(NEGATIVE_TIMEOUT, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Couldn't check exception", e);
+ }
+ }
+
+ private JMSException getReceivedException()
+ {
+ try
+ {
+ JMSException exception = _exceptions.poll(POSITIVE_TIMEOUT, TimeUnit.SECONDS);
+ assertNotNull("JMSException is expected", exception);
+ return exception;
+ }
+ catch(InterruptedException e)
+ {
+ throw new RuntimeException("Couldn't check exception", e);
+ }
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
index 4dc26847da..f5a234163d 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
@@ -26,6 +26,8 @@ import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import org.apache.qpid.server.model.Broker;
+
/**
* This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
* is set for a virtual host.
@@ -39,6 +41,9 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase
protected void configure() throws Exception
{
+ // switch off connection close in order to test timeout on publishing of unroutable messages
+ getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false);
+
// Setup housekeeping every 100ms
setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100");
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index e61efd3e32..67e3e3b8f1 100755
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -1123,7 +1123,7 @@ public class QpidBrokerTestCase extends QpidTestCase
public Connection getConnection(ConnectionURL url) throws JMSException
{
- _logger.info(url.getURL());
+ _logger.debug("get connection for " + url.getURL());
Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword());
_connections.add(connection);
@@ -1143,16 +1143,16 @@ public class QpidBrokerTestCase extends QpidTestCase
*/
public Connection getConnection(String username, String password) throws JMSException, NamingException
{
- _logger.info("get connection");
+ _logger.debug("get connection for username " + username);
Connection con = getConnectionFactory().createConnection(username, password);
//add the connection in the list of connections
_connections.add(con);
return con;
}
- public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
+ protected Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
{
- _logger.info("get Connection");
+ _logger.debug("get connection for id " + id);
Connection con = getConnectionFactory().createConnection(username, password, id);
//add the connection in the list of connections
_connections.add(con);
@@ -1160,6 +1160,19 @@ public class QpidBrokerTestCase extends QpidTestCase
}
/**
+ * Useful, for example, to avoid the connection being automatically closed in {@link #tearDown()}
+ * if it has deliberately been put into an error state already.
+ */
+ protected void forgetConnection(Connection connection)
+ {
+ _logger.debug("Forgetting about connection " + connection);
+ boolean removed = _connections.remove(connection);
+ assertTrue(
+ "The supplied connection " + connection + " should have been one that I already know about",
+ removed);
+ }
+
+ /**
* Return a uniqueName for this test.
* In this case it returns a queue Named by the TestCase and TestName
*
@@ -1190,6 +1203,7 @@ public class QpidBrokerTestCase extends QpidTestCase
return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, getTestQueueName());
}
+ @Override
protected void tearDown() throws java.lang.Exception
{
super.tearDown();
diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes
index 6ae5a40f89..96f90701fd 100755
--- a/java/test-profiles/Java010Excludes
+++ b/java/test-profiles/Java010Excludes
@@ -17,8 +17,10 @@
// under the License.
//
-// Those tests are testing 0.8 specific semantics
+// Those tests are testing 0.8..-0-9-1 specific semantics
org.apache.qpid.test.client.ImmediateAndMandatoryPublishingTest#*
+org.apache.qpid.test.client.CloseOnNoRouteForMandatoryMessageTest#*
+org.apache.qpid.systest.rest.BrokerRestTest#testSetCloseOnNoRoute
//this test checks explicitly for 0-8 flow control semantics
org.apache.qpid.test.client.FlowControlTest#*