diff options
author | Alex Rudyy <orudyy@apache.org> | 2013-06-21 17:06:57 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2013-06-21 17:06:57 +0000 |
commit | 4b461c11af414b57735df739d96a7d2a78385f99 (patch) | |
tree | a168a402c5f8af6104a6e0bf9be6390262572590 /java | |
parent | 5e17dc3e784dc15a9a3ce588317d0fe6ada2e52a (diff) | |
download | qpid-python-4b461c11af414b57735df739d96a7d2a78385f99.tar.gz |
QPID-4943: Introduce a feature for 0-8/0-9/0-9-1 protocols to close a connection on receiving a mandatory unroutable message in a transacted session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1495511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
24 files changed, 948 insertions, 112 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8d88ee902a..8588aea2d4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -324,14 +325,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { if(destinationQueues == null || destinationQueues.isEmpty()) { - if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) - { - _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); - } - else - { - _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); - } + handleUnroutableMessage(); } else { @@ -378,6 +372,61 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } + /** + * Either throws a {@link AMQConnectionException} or returns the message + * + * Pre-requisite: the current message is judged to have no destination queues. + * + * @throws AMQConnectionException if the message is mandatoryclose-on-no-route + * @see AMQProtocolSession#isCloseWhenNoRoute() + */ + private void handleUnroutableMessage() throws AMQConnectionException + { + boolean mandatory = _currentMessage.isMandatory(); + String description = currentMessageDescription(); + boolean closeOnNoRoute = _session.isCloseWhenNoRoute(); + + if(_logger.isDebugEnabled()) + { + _logger.debug(String.format( + "Unroutable message %s, mandatory=%s, transactionalSession=%s, closeOnNoRoute=%s", + description, mandatory, isTransactional(), closeOnNoRoute)); + } + + if (mandatory && isTransactional() && _session.isCloseWhenNoRoute()) + { + throw new AMQConnectionException( + AMQConstant.NO_ROUTE, + "No route for message " + currentMessageDescription(), + 0, 0, // default class and method ids + getProtocolSession().getProtocolVersion().getMajorVersion(), + getProtocolSession().getProtocolVersion().getMinorVersion(), + (Throwable) null); + } + + if (mandatory || _currentMessage.isImmediate()) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), _currentMessage)); + } + else + { + _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); + } + } + + private String currentMessageDescription() + { + if(_currentMessage == null || !_currentMessage.allContentReceived()) + { + throw new IllegalStateException("Cannot create message description for message: " + _currentMessage); + } + + return String.format( + "[Exchange: %s, Routing key: %s]", + _currentMessage.getExchange(), + _currentMessage.getRoutingKey()); + } + public void publishContentBody(ContentBody contentBody) throws AMQException { if (_currentMessage == null) @@ -522,6 +571,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * * @throws AMQException if there is an error during closure */ + @Override public void close() throws AMQException { if(!_closing.compareAndSet(false, true)) @@ -1344,7 +1394,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _message, _channelId, _errorCode.getCode(), - new AMQShortString(_description)); + AMQShortString.valueOf(_description, true, true)); } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java index 24fd687240..8f565362b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java @@ -73,6 +73,7 @@ public interface Broker extends ConfiguredObject String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit"; String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay"; + String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute"; String VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD = "virtualhost.housekeepingCheckPeriod"; String VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "virtualhost.storeTransactionIdleTimeoutClose"; @@ -113,6 +114,7 @@ public interface Broker extends ConfiguredObject VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, CONNECTION_SESSION_COUNT_LIMIT, CONNECTION_HEART_BEAT_DELAY, + CONNECTION_CLOSE_WHEN_NO_ROUTE, STATISTICS_REPORTING_PERIOD, STATISTICS_REPORTING_RESET_ENABLED, STORE_TYPE, diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 678db43d58..ff9cac9a21 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -92,6 +92,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat put(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); put(CONNECTION_SESSION_COUNT_LIMIT, Integer.class); put(CONNECTION_HEART_BEAT_DELAY, Integer.class); + put(CONNECTION_CLOSE_WHEN_NO_ROUTE, Boolean.class); put(STATISTICS_REPORTING_PERIOD, Integer.class); put(NAME, String.class); @@ -124,6 +125,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 0l; public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l; public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 0l; + public static final boolean DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE = true; @SuppressWarnings("serial") private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{ @@ -141,6 +143,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD); put(Broker.CONNECTION_HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY); put(Broker.CONNECTION_SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT); + put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE); put(Broker.NAME, DEFAULT_NAME); put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE); put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index bbf90fad86..92d6683415 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -27,8 +27,10 @@ import java.nio.ByteBuffer; import java.security.Principal; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -39,6 +41,7 @@ import java.util.concurrent.locks.ReentrantLock; import javax.security.auth.Subject; import javax.security.sasl.SaslServer; + import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; @@ -47,7 +50,24 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -102,6 +122,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + /** + * The channels that the latest call to {@link #received(ByteBuffer)} applied to. + * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()} + * on after handling the frames. + * + * Thread-safety: guarded by {@link #_receivedLock}. + */ + private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>(); + private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); private final AMQStateManager _stateManager; @@ -160,6 +189,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Broker _broker; private final Transport _transport; + private volatile boolean _closeWhenNoRoute; public AMQProtocolEngine(Broker broker, NetworkConnection network, @@ -184,6 +214,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false)); + _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE); + initialiseStatistics(); } @@ -253,17 +285,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { dataBlockReceived(dataBlock); } + catch(AMQConnectionException e) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e); + } + break; + } catch (Exception e) { _logger.error("Unexpected exception when processing datablock", e); closeProtocolSession(); + break; } } - receiveComplete(); + receivedComplete(); } catch (Exception e) { - _logger.error("Unexpected exception when processing datablock", e); + _logger.error("Unexpected exception when processing datablocks", e); closeProtocolSession(); } finally @@ -272,16 +313,45 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - private void receiveComplete() + private void receivedComplete() throws AMQException { - for (AMQChannel channel : _channelMap.values()) + Exception exception = null; + for (AMQChannel channel : _channelsForCurrentMessage) { - channel.receivedComplete(); + try + { + channel.receivedComplete(); + } + catch(Exception exceptionForThisChannel) + { + if(exception == null) + { + exception = exceptionForThisChannel; + } + _logger.error("Error informing channel that receiving is complete. Channel: " + channel, exceptionForThisChannel); + } } + _channelsForCurrentMessage.clear(); + + if(exception != null) + { + throw new AMQException( + AMQConstant.INTERNAL_ERROR, + "Error informing channel that receiving is complete: " + exception.getMessage(), + exception); + } } - public void dataBlockReceived(AMQDataBlock message) throws Exception + /** + * Process the data block. + * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}. + * + * @throws an AMQConnectionException if unable to process the data block. In this case, + * the connection is already closed by the time the exception is thrown. If any other + * type of exception is thrown, the connection is not already closed. + */ + private void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; if (message instanceof ProtocolInitiation) @@ -301,18 +371,40 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } + /** + * Handle the supplied frame. + * Adds this frame's channel to {@link #_channelsForCurrentMessage}. + * + * @throws an AMQConnectionException if unable to process the data block. In this case, + * the connection is already closed by the time the exception is thrown. If any other + * type of exception is thrown, the connection is not already closed. + */ private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); + AMQChannel amqChannel = _channelMap.get(channelId); + if(amqChannel != null) + { + // The _receivedLock is already aquired in the caller + // It is safe to add channel + _channelsForCurrentMessage.add(amqChannel); + } + else + { + // Not an error. The frame is probably a channel Open for this channel id, which + // does not require asynchronous work therefore its absence from + // _channelsForCurrentMessage is ok. + } + AMQBody body = frame.getBodyFrame(); //Look up the Channel's Actor and set that as the current actor // If that is not available then we can use the ConnectionActor // that is associated with this AMQMPSession. LogActor channelActor = null; - if (_channelMap.get(channelId) != null) + if (amqChannel != null) { - channelActor = _channelMap.get(channelId).getLogActor(); + channelActor = amqChannel.getLogActor(); } CurrentActor.set(channelActor == null ? _actor : channelActor); @@ -349,6 +441,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { body.handle(channelId, this); } + catch(AMQConnectionException e) + { + _logger.info(e.getMessage() + " whilst processing frame: " + body); + closeConnection(channelId, e); + throw e; + } catch (AMQException e) { closeChannel(channelId); @@ -400,6 +498,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi QpidProperties.getBuildVersion()); serverProperties.setString(ServerPropertyNames.QPID_INSTANCE_NAME, _broker.getName()); + serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, + String.valueOf(_closeWhenNoRoute)); AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) pv.getActualMinorVersion(), @@ -720,6 +820,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid */ + @Override public void closeChannel(int channelId) throws AMQException { final AMQChannel channel = getChannel(channelId); @@ -819,12 +920,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } /** This must be called when the session is _closed in order to free up any resources managed by the session. */ + @Override public void closeSession() throws AMQException { if(_closing.compareAndSet(false,true)) { // force sync of outstanding async work - receiveComplete(); + _receivedLock.lock(); + try + { + receivedComplete(); + } + finally + { + _receivedLock.unlock(); + } // REMOVE THIS SHOULD NOT BE HERE. if (CurrentActor.get() == null) @@ -900,6 +1010,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } + @Override public void closeProtocolSession() { _network.close(); @@ -968,6 +1079,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _clientProperties = clientProperties; if (_clientProperties != null) { + Boolean closeWhenNoRoute = _clientProperties.getBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); + if (closeWhenNoRoute != null) + { + _closeWhenNoRoute = closeWhenNoRoute; + if(_logger.isDebugEnabled()) + { + _logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this); + } + } + _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8); if (_clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8) != null) @@ -1538,4 +1659,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { return _lastWriteTime.get(); } + + @Override + public boolean isCloseWhenNoRoute() + { + return _closeWhenNoRoute; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index ba806c04bd..1842117d6f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -218,4 +218,11 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth public Principal getPeerPrincipal(); Lock getReceivedLock(); + + /** + * Used for 0-8/0-9/0-9-1 connections to choose to close + * the connection when a transactional session receives a 'mandatory' message which + * can't be routed rather than returning the message. + */ + boolean isCloseWhenNoRoute(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 8de19d9cff..1c8939d117 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -73,6 +73,7 @@ public class BrokerTestHelper when(subjectCreator.getMechanisms()).thenReturn(""); Broker broker = mock(Broker.class); when(broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT)).thenReturn(1); + when(broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(false); when(broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(10000l); when(broker.getId()).thenReturn(UUID.randomUUID()); when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4e885258b9..74c9878a8e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -844,7 +844,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - public void close() throws JMSException + public void close() throws JMSException { close(DEFAULT_TIMEOUT); } @@ -859,9 +859,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!setClosed()) { setClosing(true); - try{ + try + { doClose(sessions, timeout); - }finally{ + } + finally + { setClosing(false); } } @@ -1594,4 +1597,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _validateQueueOnSend; } + + @Override + protected boolean setClosed() + { + return super.setClosed(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java b/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java new file mode 100644 index 0000000000..baae072167 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.properties.ConnectionStartProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used during connection establishment to optionally set the "close when no route" client property + */ +class CloseWhenNoRouteSettingsHelper +{ + private static final Logger _log = LoggerFactory.getLogger(CloseWhenNoRouteSettingsHelper.class); + + /** + * @param url the client's connection URL which may contain the option + * {@value ConnectionStartProperties#QPID_CLOSE_WHEN_NO_ROUTE} + * @param serverProperties the properties received from the broker which may contain the option + * {@value ConnectionStartProperties#QPID_CLOSE_WHEN_NO_ROUTE} + * @param clientProperties the client properties to optionally set the close-when-no-route option on + */ + public void setClientProperties(FieldTable clientProperties, ConnectionURL url, FieldTable serverProperties) + { + boolean brokerSupportsCloseWhenNoRoute = + serverProperties != null && serverProperties.containsKey(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); + boolean brokerCloseWhenNoRoute = brokerSupportsCloseWhenNoRoute && + Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE)); + + String closeWhenNoRouteOption = url.getOption(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE); + if(closeWhenNoRouteOption != null) + { + if(brokerSupportsCloseWhenNoRoute) + { + boolean desiredCloseWhenNoRoute = Boolean.valueOf(closeWhenNoRouteOption); + if(desiredCloseWhenNoRoute != brokerCloseWhenNoRoute) + { + clientProperties.setBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute); + _log.debug( + "Set client property {} to {}", + ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute); + } + else + { + _log.debug( + "Client's desired {} value {} already matches the server's", + ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute); + } + } + else + { + _log.warn("The broker being connected to does not support the " + ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE + " option"); + } + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 66c4821f60..366b5f115e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -37,6 +37,7 @@ import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.properties.ConnectionStartProperties; import javax.security.sasl.Sasl; @@ -51,6 +52,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler(); + private final CloseWhenNoRouteSettingsHelper _closeWhenNoRouteHelper = new CloseWhenNoRouteSettingsHelper(); + public static ConnectionStartMethodHandler getInstance() { return _instance; @@ -59,6 +62,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co private ConnectionStartMethodHandler() { } + @Override public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId) throws AMQException { @@ -147,6 +151,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co } session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED); + FieldTable clientProperties = FieldTableFactory.newFieldTable(); clientProperties.setString(ConnectionStartProperties.CLIENT_ID_0_8, @@ -162,12 +167,16 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co clientProperties.setInteger(ConnectionStartProperties.PID, ConnectionStartProperties.getPID()); + FieldTable serverProperties = body.getServerProperties(); + ConnectionURL url = getConnectionURL(session); + _closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties); + ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales)); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. session.writeFrame(connectionStartOkBody.generateFrame(channelId)); - + } catch (UnsupportedEncodingException e) { @@ -195,7 +204,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co try { AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism); - instance.initialise(protocolSession.getAMQConnection().getConnectionURL()); + instance.initialise(getConnectionURL(protocolSession)); return instance; } @@ -205,4 +214,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co } } + private ConnectionURL getConnectionURL(AMQProtocolSession protocolSession) + { + return protocolSession.getAMQConnection().getConnectionURL(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index c4fbeb5607..3050e84419 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -57,6 +57,19 @@ public interface ConnectionURL * the DLQ (or dropped) when delivery count exceeds the maximum. */ public static final String OPTIONS_REJECT_BEHAVIOUR = "rejectbehaviour"; + + /** + * <p> + * This option is only applicable for 0-8/0-9/0-9-1 protocol connections. + * </p> + * <p> + * It tells the client to request whether the broker should close the + * connection when a mandatory message isn't routable, rather than return + * the message to the client as it normally would. + * </p> + */ + public static final String OPTIONS_CLOSE_WHEN_NO_ROUTE = "closeWhenNoRoute"; + public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; diff --git a/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java b/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java new file mode 100644 index 0000000000..f1d7a76c75 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client.handler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.test.utils.QpidTestCase; + +public class CloseWhenNoRouteSettingsHelperTest extends QpidTestCase +{ + private static final String FALSE_STR = Boolean.toString(false); + private static final String TRUE_STR = Boolean.toString(true); + + private final CloseWhenNoRouteSettingsHelper _closeWhenNoRouteSettingsHelper = new CloseWhenNoRouteSettingsHelper(); + + public void testCloseWhenNoRouteNegotiation() + { + test("Nothing should be set if option not in URL", + null, + true, + null); + test("Client should disable broker's enabled option", + FALSE_STR, + true, + false); + test("Client should be able to disable broker's enabled option", + TRUE_STR, + false, + true); + test("Client should not enable option if unsupported by broker", + TRUE_STR, + null, + null); + test("Malformed client option should evaluate to false", + "malformed boolean", + true, + false); + } + + private void test(String message, String urlOption, Boolean serverOption, Boolean expectedClientProperty) + { + ConnectionURL url = mock(ConnectionURL.class); + when(url.getOption(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE)).thenReturn(urlOption); + + FieldTable serverProperties = new FieldTable(); + if(serverOption != null) + { + serverProperties.setBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, serverOption); + } + + FieldTable clientProperties = new FieldTable(); + + _closeWhenNoRouteSettingsHelper.setClientProperties(clientProperties, url, serverProperties); + + assertEquals(message, expectedClientProperty, clientProperties.getBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE)); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 2d54e35191..40ecc3a946 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -149,10 +149,6 @@ public class AMQException extends Exception public AMQShortString getMessageAsShortString() { String message = getMessage(); - if (message != null && message.length() > AMQShortString.MAX_LENGTH) - { - message = message.substring(0, AMQShortString.MAX_LENGTH - 3) + "..."; - } - return new AMQShortString(message); + return AMQShortString.valueOf(message, true, true); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index b577c916c6..4adc59b158 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -794,9 +794,30 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return false; //To change body of created methods use File | Settings | File Templates. } + public static AMQShortString valueOf(Object obj, boolean truncate, boolean nullAsEmptyString) + { + if (obj == null) + { + if (nullAsEmptyString) + { + return EMPTY_STRING; + } + return null; + } + else + { + String value = String.valueOf(obj); + if (truncate && value.length() > AMQShortString.MAX_LENGTH) + { + value = value.substring(0, AMQShortString.MAX_LENGTH - 3) + "..."; + } + return valueOf(value); + } + } + public static AMQShortString valueOf(Object obj) { - return obj == null ? null : AMQShortString.valueOf(String.valueOf(obj)); + return valueOf(obj, false, false); } public static AMQShortString valueOf(String obj) diff --git a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java index 59a1b6c5b0..6f9d872f98 100644 --- a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java +++ b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java @@ -26,13 +26,20 @@ import java.lang.management.RuntimeMXBean; import org.apache.qpid.transport.util.Logger; /** - * Constants for the various properties 0-10 clients can + * Constants for the various properties clients can * set values for during the ConnectionStartOk reply. */ public class ConnectionStartProperties { private static final Logger LOGGER = Logger.get(ConnectionStartProperties.class); + /** + * Used for 0-8/0-9/0-9-1 connections to choose to close + * the connection when a transactional session receives a 'mandatory' message which + * can't be routed rather than returning the message. + */ + public static final String QPID_CLOSE_WHEN_NO_ROUTE = "qpid.close_when_no_route"; + public static final String CLIENT_ID_0_10 = "clientName"; public static final String CLIENT_ID_0_8 = "instance"; diff --git a/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java index 0f8fbf0685..61ac04213e 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java @@ -300,6 +300,32 @@ public class AMQShortStringTest extends TestCase assertEquals("join result differs from expected", expected.toString(), result.asString()); } + public void testValueOf() + { + String string = buildString('a', 255); + AMQShortString shortString = AMQShortString.valueOf(string, true, true); + assertEquals("Unexpected string from valueOf", string, shortString.asString()); + } + + public void testValueOfTruncated() + { + String string = buildString('a', 256); + AMQShortString shortString = AMQShortString.valueOf(string, true, true); + assertEquals("Unexpected truncated string from valueOf", string.substring(0, AMQShortString.MAX_LENGTH -3) + "...", shortString.asString()); + } + + public void testValueOfNulAsEmptyString() + { + AMQShortString shortString = AMQShortString.valueOf(null, true, true); + assertEquals("Unexpected empty string from valueOf", AMQShortString.EMPTY_STRING, shortString); + } + + public void testValueOfNullAsNull() + { + AMQShortString shortString = AMQShortString.valueOf(null, true, false); + assertEquals("Unexpected null string from valueOf", null, shortString); + } + /** * A helper method to generate a string with given length containing given * character diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java index 8324ac74a5..00c85e80c8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java @@ -19,6 +19,8 @@ package org.apache.qpid.server.security.acl; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; @@ -306,7 +308,11 @@ public class ExternalACLTest extends AbstractACLTestCase conn.start(); - MessageProducer sender = sess.createProducer(sess.createQueue("example.RequestQueue")); + Queue queue = sess.createQueue("example.RequestQueue"); + + ((AMQSession<?,?>)sess).declareAndBind((AMQDestination)queue); + + MessageProducer sender = sess.createProducer(queue); sender.send(sess.createTextMessage("test")); @@ -316,7 +322,6 @@ public class ExternalACLTest extends AbstractACLTestCase conn.close(); } - public void setUpRequestResponseSuccess() throws Exception { // The group "messaging-users", referenced in the ACL below, is currently defined diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java index 99ee99e8c5..cc662bddca 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java @@ -14,6 +14,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -40,6 +41,9 @@ public class ExchangeManagementTest extends QpidBrokerTestCase { getBrokerConfiguration().addJmxManagementConfiguration(); + // to test exchange selectors the publishing of unroutable messages should be allowed + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); + _jmxUtils = new JMXTestUtils(this); super.setUp(); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java index 0d40eca745..1d2f6e3427 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java @@ -27,6 +27,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator; import org.apache.qpid.server.model.Broker; @@ -35,7 +40,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.client.UnroutableMessageTestExceptionListener; import org.apache.qpid.test.utils.TestBrokerConfiguration; public class BrokerRestTest extends QpidRestTestCase @@ -160,6 +165,33 @@ public class BrokerRestTest extends QpidRestTestCase assertEquals("Unexpected update response for flow resume size > flow size", 409, response); } + public void testSetCloseOnNoRoute() throws Exception + { + Map<String, Object> brokerDetails = getRestTestHelper().getJsonAsSingletonList("/rest/broker"); + assertTrue("closeOnNoRoute should be true", (Boolean)brokerDetails.get(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)); + + Map<String, Object> brokerAttributes = new HashMap<String, Object>(); + brokerAttributes.put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); + + int response = getRestTestHelper().submitRequest("/rest/broker", "PUT", brokerAttributes); + assertEquals("Unexpected update response", 200, response); + + brokerDetails = getRestTestHelper().getJsonAsSingletonList("/rest/broker"); + assertFalse("closeOnNoRoute should be false", (Boolean)brokerDetails.get(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)); + + Connection connection = getConnection(); + UnroutableMessageTestExceptionListener exceptionListener = new UnroutableMessageTestExceptionListener(); + connection.setExceptionListener(exceptionListener); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(getTestQueue()); + TextMessage message = session.createTextMessage("Test"); + producer.send(message); + + session.commit(); + + exceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); + } + private Map<String, Object> getValidBrokerAttributes() { Map<String, Object> brokerAttributes = new HashMap<String, Object>(); diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java new file mode 100644 index 0000000000..a8a72c20fc --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/CloseOnNoRouteForMandatoryMessageTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.client; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Tests the broker's connection-closing behaviour when it receives an unroutable message + * on a transactional session. + * + * @see ImmediateAndMandatoryPublishingTest for more general tests of mandatory and immediate publishing + */ +public class CloseOnNoRouteForMandatoryMessageTest extends QpidBrokerTestCase +{ + private Connection _connection; + private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener(); + + @Override + public void setUp() throws Exception + { + super.setUp(); + } + + public void testNoRoute_brokerClosesConnection() throws Exception + { + createConnectionWithCloseWhenNoRoute(true); + + Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + String testQueueName = getTestQueueName(); + MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer( + transactedSession.createQueue(testQueueName), + true, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + mandatoryProducer.send(message); + try + { + transactedSession.commit(); + fail("Expected exception not thrown"); + } + catch(JMSException e) + { + _testExceptionListener.assertNoRoute(e, testQueueName); + } + _testExceptionListener.assertReceivedNoRoute(testQueueName); + + forgetConnection(_connection); + } + + public void testCloseOnNoRouteWhenExceptionMessageLengthIsGreater255() throws Exception + { + createConnectionWithCloseWhenNoRoute(true); + + AMQSession<?, ?> transactedSession = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED); + + StringBuilder longExchangeName = getLongExchangeName(); + + AMQShortString exchangeName = new AMQShortString(longExchangeName.toString()); + transactedSession.declareExchange(exchangeName, new AMQShortString("direct"), false); + + Destination testQueue = new AMQQueue(exchangeName, getTestQueueName()); + MessageProducer mandatoryProducer = transactedSession.createProducer( + testQueue, + true, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + mandatoryProducer.send(message); + try + { + transactedSession.commit(); + fail("Expected exception not thrown"); + } + catch (JMSException e) + { + AMQException noRouteException = (AMQException) e.getLinkedException(); + assertNotNull("AMQException should be linked to JMSException", noRouteException); + + assertEquals(AMQConstant.NO_ROUTE, noRouteException.getErrorCode()); + String expectedMessage = "Error: No route for message [Exchange: " + longExchangeName.substring(0, 220) + "..."; + assertEquals("Unexpected exception message: " + noRouteException.getMessage(), expectedMessage, + noRouteException.getMessage()); + } + finally + { + forgetConnection(_connection); + } + } + + public void testNoRouteMessageReurnedWhenExceptionMessageLengthIsGreater255() throws Exception + { + createConnectionWithCloseWhenNoRoute(false); + + AMQSession<?, ?> transactedSession = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED); + + StringBuilder longExchangeName = getLongExchangeName(); + + AMQShortString exchangeName = new AMQShortString(longExchangeName.toString()); + transactedSession.declareExchange(exchangeName, new AMQShortString("direct"), false); + + AMQQueue testQueue = new AMQQueue(exchangeName, getTestQueueName()); + MessageProducer mandatoryProducer = transactedSession.createProducer( + testQueue, + true, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + mandatoryProducer.send(message); + transactedSession.commit(); + _testExceptionListener.assertReceivedReturnedMessageWithLongExceptionMessage(message, testQueue); + } + + private StringBuilder getLongExchangeName() + { + StringBuilder longExchangeName = new StringBuilder(); + for (int i = 0; i < 50; i++) + { + longExchangeName.append("abcde"); + } + return longExchangeName; + } + + public void testNoRouteForNonMandatoryMessage_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception + { + createConnectionWithCloseWhenNoRoute(true); + + Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + String testQueueName = getTestQueueName(); + MessageProducer nonMandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer( + transactedSession.createQueue(testQueueName), + false, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + nonMandatoryProducer.send(message); + + // should succeed - the message is simply discarded + transactedSession.commit(); + + _testExceptionListener.assertNoException(); + } + + + public void testNoRouteOnNonTransactionalSession_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception + { + createConnectionWithCloseWhenNoRoute(true); + + Session nonTransactedSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String testQueueName = getTestQueueName(); + MessageProducer mandatoryProducer = ((AMQSession<?, ?>) nonTransactedSession).createProducer( + nonTransactedSession.createQueue(testQueueName), + true, // mandatory + false); // immediate + + Message message = nonTransactedSession.createMessage(); + mandatoryProducer.send(message); + + // should succeed - the message is asynchronously bounced back to the exception listener + message.acknowledge(); + + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); + } + + public void testClientDisablesCloseOnNoRoute_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception + { + createConnectionWithCloseWhenNoRoute(false); + + Session transactedSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + String testQueueName = getTestQueueName(); + MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer( + transactedSession.createQueue(testQueueName), + true, // mandatory + false); // immediate + + Message message = transactedSession.createMessage(); + mandatoryProducer.send(message); + transactedSession.commit(); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); + } + + private void createConnectionWithCloseWhenNoRoute(boolean closeWhenNoRoute) throws URLSyntaxException, NamingException, JMSException + { + Map<String, String> options = new HashMap<String, String>(); + options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(closeWhenNoRoute)); + _connection = getConnectionWithOptions(options); + _connection.setExceptionListener(_testExceptionListener); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java index b746a5b09e..d012b9abbb 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/ImmediateAndMandatoryPublishingTest.java @@ -18,34 +18,34 @@ */ package org.apache.qpid.test.client; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; + import org.apache.qpid.client.AMQSession; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.test.utils.QpidBrokerTestCase; -public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase implements ExceptionListener +/** + * @see CloseOnNoRouteForMandatoryMessageTest for related tests + */ +public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase { private Connection _connection; - private BlockingQueue<JMSException> _exceptions; + private UnroutableMessageTestExceptionListener _testExceptionListener = new UnroutableMessageTestExceptionListener(); + @Override public void setUp() throws Exception { + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); super.setUp(); - _exceptions = new ArrayBlockingQueue<JMSException>(1); _connection = getConnection(); - _connection.setExceptionListener(this); + _connection.setExceptionListener(_testExceptionListener); } public void testPublishP2PWithNoConsumerAndImmediateOnAndAutoAck() throws Exception @@ -103,14 +103,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl consumerCreateAndClose(true, false); Message message = produceMessage(Session.AUTO_ACKNOWLEDGE, true, false, true); - - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); } private void publishIntoExistingDestinationWithNoConsumerAndImmediateOn(int acknowledgeMode, boolean pubSub) @@ -120,27 +113,14 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl Message message = produceMessage(acknowledgeMode, pubSub, false, true); - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoConsumersException noConsumerException = (AMQNoConsumersException) exception.getLinkedException(); - assertNotNull("AMQNoConsumersException should be linked to JMSEXception", noConsumerException); - Message bounceMessage = (Message) noConsumerException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoConsumersWithReturnedMessage(message); } private void publishWithMandatoryOnImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException, InterruptedException { Message message = produceMessage(acknowledgeMode, pubSub, true, false); - - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); } private void publishWithMandatoryOffImmediateOff(int acknowledgeMode, boolean pubSub) throws JMSException, @@ -148,8 +128,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl { produceMessage(acknowledgeMode, pubSub, false, false); - JMSException exception = _exceptions.poll(1, TimeUnit.SECONDS); - assertNull("Unexpected JMSException", exception); + _testExceptionListener.assertNoException(); } private void consumerCreateAndClose(boolean pubSub, boolean durable) throws JMSException @@ -210,41 +189,26 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl Message message = session.createMessage(); producer.send(message); - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); producer = session.createProducer(null); message = session.createMessage(); producer.send(session.createQueue(getTestQueueName()), message); - exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); - + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); // publish to non-existent topic - should get no failure producer = session.createProducer(session.createTopic(getTestQueueName())); message = session.createMessage(); producer.send(message); - exception = _exceptions.poll(1, TimeUnit.SECONDS); - assertNull("Unexpected JMSException", exception); + _testExceptionListener.assertNoException(); producer = session.createProducer(null); message = session.createMessage(); producer.send(session.createTopic(getTestQueueName()), message); - exception = _exceptions.poll(1, TimeUnit.SECONDS); - assertNull("Unexpected JMSException", exception); + _testExceptionListener.assertNoException(); session.close(); } @@ -260,13 +224,7 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl Message message = session.createMessage(); producer.send(message); - JMSException exception = _exceptions.poll(10, TimeUnit.SECONDS); - assertNotNull("JMSException is expected", exception); - AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); - assertNotNull("AMQNoRouteException should be linked to JMSEXception", noRouteException); - Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); - assertNotNull("Bounced Message is expected", bounceMessage); - assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName()); // now set topic specific system property to false - should no longer get mandatory failure on new producer setTestClientSystemProperty("qpid.default_mandatory_topic","false"); @@ -274,17 +232,6 @@ public class ImmediateAndMandatoryPublishingTest extends QpidBrokerTestCase impl message = session.createMessage(); producer.send(session.createTopic(getTestQueueName()), message); - exception = _exceptions.poll(1, TimeUnit.SECONDS); - if(exception != null) - { - exception.printStackTrace(); - } - assertNull("Unexpected JMSException", exception); - - } - - public void onException(JMSException exception) - { - _exceptions.add(exception); + _testExceptionListener.assertNoException(); } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java b/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java new file mode 100644 index 0000000000..7ba0bd17f3 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.client; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.protocol.AMQConstant; + +/** + * Provides utility methods for checking exceptions that are thrown on the client side when a message is + * not routable. + * + * Exception objects are passed either explicitly as method parameters or implicitly + * by previously doing {@link Connection#setExceptionListener(ExceptionListener)}. + */ +public class UnroutableMessageTestExceptionListener implements ExceptionListener +{ + private static final Logger _logger = Logger.getLogger(UnroutableMessageTestExceptionListener.class); + + /** + * Number of seconds to check for an event that should should NOT happen + */ + private static final int NEGATIVE_TIMEOUT = 2; + + /** + * Number of seconds to keep checking for an event that should should happen + */ + private static final int POSITIVE_TIMEOUT = 30; + + private BlockingQueue<JMSException> _exceptions = new ArrayBlockingQueue<JMSException>(1); + + @Override + public void onException(JMSException e) + { + _logger.info("Received exception " + e); + _exceptions.add(e); + } + + public void assertReceivedNoRouteWithReturnedMessage(Message message, String intendedQueueName) + { + JMSException exception = getReceivedException(); + assertNoRouteExceptionWithReturnedMessage(exception, message, intendedQueueName); + } + + public void assertReceivedNoRoute(String intendedQueueName) + { + JMSException exception = getReceivedException(); + assertNoRoute(exception, intendedQueueName); + } + + public void assertReceivedNoConsumersWithReturnedMessage(Message message) + { + JMSException exception = getReceivedException(); + AMQNoConsumersException noConsumersException = (AMQNoConsumersException) exception.getLinkedException(); + assertNotNull("AMQNoConsumersException should be linked to JMSException", noConsumersException); + Message bounceMessage = (Message) noConsumersException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + + try + { + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + } + catch (JMSException e) + { + throw new RuntimeException("Couldn't check exception", e); + } + } + + public void assertReceivedReturnedMessageWithLongExceptionMessage(Message message, AMQQueue queue) + { + JMSException exception = getReceivedException(); + assertNoRouteException(exception, message); + AMQShortString exchangeName = queue.getExchangeName(); + String expectedMessage = "Error: No Route for message [Exchange: " + exchangeName.asString().substring(0, 220) + "..."; + assertTrue("Unexpected exception message: " + exception.getMessage(), exception.getMessage().contains(expectedMessage)); + } + + public void assertNoRouteExceptionWithReturnedMessage( + JMSException exception, Message message, String intendedQueueName) + { + assertNoRoute(exception, intendedQueueName); + + assertNoRouteException(exception, message); + } + + private void assertNoRouteException(JMSException exception, Message message) + { + AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException(); + assertNotNull("AMQNoRouteException should be linked to JMSException", noRouteException); + Message bounceMessage = (Message) noRouteException.getUndeliveredMessage(); + assertNotNull("Bounced Message is expected", bounceMessage); + + try + { + assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID()); + } + catch (JMSException e) + { + throw new RuntimeException("Couldn't check exception", e); + } + } + + public void assertNoRoute(JMSException exception, String intendedQueueName) + { + assertTrue( + exception + " message should contain intended queue name", + exception.getMessage().contains(intendedQueueName)); + + AMQException noRouteException = (AMQException) exception.getLinkedException(); + assertNotNull("AMQException should be linked to JMSException", noRouteException); + + assertEquals(AMQConstant.NO_ROUTE, noRouteException.getErrorCode()); + assertTrue( + "Linked exception " + noRouteException + " message should contain intended queue name", + noRouteException.getMessage().contains(intendedQueueName)); + } + + + public void assertNoException() + { + try + { + assertNull("Unexpected JMSException", _exceptions.poll(NEGATIVE_TIMEOUT, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException("Couldn't check exception", e); + } + } + + private JMSException getReceivedException() + { + try + { + JMSException exception = _exceptions.poll(POSITIVE_TIMEOUT, TimeUnit.SECONDS); + assertNotNull("JMSException is expected", exception); + return exception; + } + catch(InterruptedException e) + { + throw new RuntimeException("Couldn't check exception", e); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java index 4dc26847da..f5a234163d 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -26,6 +26,8 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; +import org.apache.qpid.server.model.Broker; + /** * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration * is set for a virtual host. @@ -39,6 +41,9 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase protected void configure() throws Exception { + // switch off connection close in order to test timeout on publishing of unroutable messages + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); + // Setup housekeeping every 100ms setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index e61efd3e32..67e3e3b8f1 100755 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -1123,7 +1123,7 @@ public class QpidBrokerTestCase extends QpidTestCase public Connection getConnection(ConnectionURL url) throws JMSException { - _logger.info(url.getURL()); + _logger.debug("get connection for " + url.getURL()); Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword()); _connections.add(connection); @@ -1143,16 +1143,16 @@ public class QpidBrokerTestCase extends QpidTestCase */ public Connection getConnection(String username, String password) throws JMSException, NamingException { - _logger.info("get connection"); + _logger.debug("get connection for username " + username); Connection con = getConnectionFactory().createConnection(username, password); //add the connection in the list of connections _connections.add(con); return con; } - public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException + protected Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException { - _logger.info("get Connection"); + _logger.debug("get connection for id " + id); Connection con = getConnectionFactory().createConnection(username, password, id); //add the connection in the list of connections _connections.add(con); @@ -1160,6 +1160,19 @@ public class QpidBrokerTestCase extends QpidTestCase } /** + * Useful, for example, to avoid the connection being automatically closed in {@link #tearDown()} + * if it has deliberately been put into an error state already. + */ + protected void forgetConnection(Connection connection) + { + _logger.debug("Forgetting about connection " + connection); + boolean removed = _connections.remove(connection); + assertTrue( + "The supplied connection " + connection + " should have been one that I already know about", + removed); + } + + /** * Return a uniqueName for this test. * In this case it returns a queue Named by the TestCase and TestName * @@ -1190,6 +1203,7 @@ public class QpidBrokerTestCase extends QpidTestCase return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, getTestQueueName()); } + @Override protected void tearDown() throws java.lang.Exception { super.tearDown(); diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes index 6ae5a40f89..96f90701fd 100755 --- a/java/test-profiles/Java010Excludes +++ b/java/test-profiles/Java010Excludes @@ -17,8 +17,10 @@ // under the License. // -// Those tests are testing 0.8 specific semantics +// Those tests are testing 0.8..-0-9-1 specific semantics org.apache.qpid.test.client.ImmediateAndMandatoryPublishingTest#* +org.apache.qpid.test.client.CloseOnNoRouteForMandatoryMessageTest#* +org.apache.qpid.systest.rest.BrokerRestTest#testSetCloseOnNoRoute //this test checks explicitly for 0-8 flow control semantics org.apache.qpid.test.client.FlowControlTest#* |