diff options
Diffstat (limited to 'qpid/java')
46 files changed, 816 insertions, 353 deletions
diff --git a/qpid/java/broker/bin/qpid-server b/qpid/java/broker/bin/qpid-server index e5a9e998e2..738fc6e084 100755 --- a/qpid/java/broker/bin/qpid-server +++ b/qpid/java/broker/bin/qpid-server @@ -26,14 +26,19 @@ fi # Set classpath to include Qpid jar with all required jars in manifest QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/bdbstore-launch.jar +# Default Log4j to append to its log file +if [ -z "$QPID_LOG_APPEND" ]; then + export QPID_LOG_APPEND="true" +fi + # Set other variables used by the qpid-run script before calling export JAVA=java \ JAVA_VM=-server \ JAVA_MEM=-Xmx1024m \ JAVA_GC="-XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \ QPID_CLASSPATH=$QPID_LIBS \ - QPID_RUN_LOG=2 + QPID_RUN_LOG=2 -QPID_OPTS="$QPID_OPTS -Damqj.read_write_pool_size=32" +QPID_OPTS="$QPID_OPTS -Damqj.read_write_pool_size=32 -DQPID_LOG_APPEND=$QPID_LOG_APPEND" . qpid-run org.apache.qpid.server.Main "$@" diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml index a395d0fd56..8ca43ededd 100644 --- a/qpid/java/broker/etc/log4j.xml +++ b/qpid/java/broker/etc/log4j.xml @@ -50,7 +50,7 @@ <appender class="org.apache.log4j.FileAppender" name="FileAppender"> <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/> - <param name="Append" value="false"/> + <param name="Append" value="${QPID_LOG_APPEND}"/> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index fc667db17b..c5f5cd05e1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -204,11 +204,21 @@ public class HeadersExchange extends AbstractExchange for (int i = 0; i < bindings.length; i++) { String[] keyAndValue = bindings[i].split("="); - if (keyAndValue == null || keyAndValue.length < 2) + if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2) { throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" "); } - bindingMap.setString(keyAndValue[0], keyAndValue[1]); + + if(keyAndValue.length ==1) + { + //no value was given, only a key. Use an empty value + //to signal match on key presence alone + bindingMap.setString(keyAndValue[0], ""); + } + else + { + bindingMap.setString(keyAndValue[0], keyAndValue[1]); + } } _bindings.add(new Registration(new HeadersBinding(bindingMap), queue)); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java index 0dffde50fa..bfb122985b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java @@ -39,4 +39,11 @@ public class StartupRootMessageLogger extends RootMessageLoggerImpl return true; } + @Override + public boolean isMessageEnabled(LogActor actor) + { + return true; + } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java index 42b3b05ac5..aea9ab43ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java @@ -379,6 +379,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry try { _cs.stop(); + CurrentActor.get().message(ManagementConsoleMessages.MNG_1003("RMI Registry", _cs.getAddress().getPort() - PORT_EXPORT_OFFSET)); + CurrentActor.get().message(ManagementConsoleMessages.MNG_1003("RMI ConnectorServer", _cs.getAddress().getPort())); } catch (IOException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java index f7e537b02b..a6fae053c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java @@ -120,10 +120,4 @@ public abstract class BasicACLPlugin implements ACLPlugin // no-op } - public boolean supportsTag(String name) - { - // This plugin doesn't support any tags - return false; - } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallFactory.java deleted file mode 100644 index a1a399e5bf..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.security.access.plugins.network; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.security.access.ACLPlugin; -import org.apache.qpid.server.security.access.ACLPluginFactory; - -public class FirewallFactory implements ACLPluginFactory -{ - - @Override - public ACLPlugin newInstance(Configuration config) throws ConfigurationException - { - FirewallPlugin plugin = new FirewallPlugin(); - plugin.setConfiguration(config); - return plugin; - } - - @Override - public boolean supportsTag(String name) - { - return name.equals("firewall"); - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index b34ef1c382..72d6afc65c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -599,7 +599,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) { _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString())); } else { @@ -612,9 +611,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) { _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString())); } } + CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString())); } public State getState() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java index f62b0c6241..317dee2b47 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java @@ -46,17 +46,4 @@ public class ExchangeDenier extends AllowAll { return AuthzResult.DENIED; } - - @Override - public String getPluginName() - { - return getClass().getSimpleName(); - } - - @Override - public boolean supportsTag(String name) - { - return name.equals("exchangeDenier"); - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index 05ac3dca9e..6bae0166d1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -39,9 +39,4 @@ public class AMQAuthenticationException extends AMQException { super(error, msg, cause); } - public boolean isHardError() - { - return true; - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 118be75705..2e3e417c95 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -65,6 +65,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -205,9 +206,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); - protected static final boolean DECLARE_QUEUES = + protected final boolean DECLARE_QUEUES = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); - protected static final boolean DECLARE_EXCHANGES = + protected final boolean DECLARE_EXCHANGES = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); /** System property to enable strict AMQP compliance. */ @@ -629,6 +630,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public void close(long timeout) throws JMSException { + close(timeout, true); + } + + private void close(long timeout, boolean sendClose) throws JMSException + { if (_logger.isInfoEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -654,9 +660,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // If the connection is open or we are in the process // of closing the connection then send a cance // no point otherwise as the connection will be gone - if (!_connection.isClosed() || _connection.isClosing()) + if (!_connection.isClosed() || _connection.isClosing()) { - sendClose(timeout); + if (sendClose) + { + sendClose(timeout); + } } } catch (AMQException e) @@ -712,25 +721,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (!_closed.getAndSet(true)) { - synchronized (getFailoverMutex()) + synchronized (_messageDeliveryLock) { - synchronized (_messageDeliveryLock) + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); } + + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); } } } @@ -1737,6 +1743,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { + if (e instanceof AMQChannelClosedException) + { + close(-1, false); + } + JMSException ex = new JMSException("Error registering consumer: " + e); ex.setLinkedException(e); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index fa4e08f62b..d7196c0abb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -33,6 +33,7 @@ import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; @@ -116,12 +117,23 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public void sendClose(long timeout) throws AMQException, FailoverException { - getProtocolHandler().closeSession(this); - getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), - ChannelCloseOkBody.class, timeout); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully. + // we also need to check the state manager for 08/09 as the + // _connection variable may not be updated in time by the error receiving + // thread. + // We can't close the session if we are alreadying in the process of + // closing/closed the connection. + + if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED) + || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING))) + { + + getProtocolHandler().closeSession(this); + getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), + new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), + ChannelCloseOkBody.class, timeout); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } } public void sendCommit() throws AMQException, FailoverException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 2b6745ebe4..2cf19bf391 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -32,7 +32,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQConstant; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +47,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann } public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId) - throws AMQException + throws AMQException { _logger.debug("ChannelClose method received"); @@ -59,52 +58,62 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); } - - ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody(); AMQFrame frame = body.generateFrame(channelId); session.writeFrame(frame); - - if (errorCode != AMQConstant.REPLY_SUCCESS) + try { - if (_logger.isDebugEnabled()) - { - _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason); - } - - if (errorCode == AMQConstant.NO_CONSUMERS) - { - throw new AMQNoConsumersException("Error: " + reason, null, null); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - throw new AMQNoRouteException("Error: " + reason, null, null); - } - else if (errorCode == AMQConstant.INVALID_ARGUMENT) + if (errorCode != AMQConstant.REPLY_SUCCESS) { - _logger.debug("Broker responded with Invalid Argument."); + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason); + } + + if (errorCode == AMQConstant.NO_CONSUMERS) + { + throw new AMQNoConsumersException("Error: " + reason, null, null); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + throw new AMQNoRouteException("Error: " + reason, null, null); + } + else if (errorCode == AMQConstant.INVALID_ARGUMENT) + { + _logger.debug("Broker responded with Invalid Argument."); + + throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null); + } + else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) + { + _logger.debug("Broker responded with Invalid Routing Key."); + + throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null); + } + else + { + + throw new AMQChannelClosedException(errorCode, "Error: " + reason, null); + } - throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null); - } - else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) - { - _logger.debug("Broker responded with Invalid Routing Key."); - - throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null); - } - else - { - throw new AMQChannelClosedException(errorCode, "Error: " + reason, null); } - } - // fixme why is this only done when the close is expected... - // should the above forced closes not also cause a close? - // ---------- - // Closing the session only when it is expected allows the errors to be processed - // Calling this here will prevent failover. So we should do this for all exceptions - // that should never cause failover. Such as authentication errors. - - session.channelClosed(channelId, errorCode, String.valueOf(reason)); + finally + { + // fixme why is this only done when the close is expected... + // should the above forced closes not also cause a close? + // ---------- + // Closing the session only when it is expected allows the errors to be processed + // Calling this here will prevent failover. So we should do this for all exceptions + // that should never cause failover. Such as authentication errors. + // ---- + // 2009-09-07 - ritchiem + // calling channelClosed will only close this session and will not + // prevent failover. If we don't close the session here then we will + // have problems during the session close as it will attempt to + // close the session that the broker has closed, + + session.channelClosed(channelId, errorCode, String.valueOf(reason)); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java index e639a33450..e40cafd72f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java @@ -40,7 +40,6 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<C } public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId) - throws AMQException { session.getStateManager().changeState(AMQState.CONNECTION_OPEN); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index e4e58c317d..287b5957a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -45,7 +45,6 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con { } public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId) - throws AMQException { _logger.debug("ConnectionTune frame received"); final MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java index f2aca58deb..8782e00a12 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java @@ -52,7 +52,7 @@ public class AMQIoTransportProtocolSession extends AMQProtocolSession } @Override - public void closeProtocolSession(boolean waitLast) throws AMQException + public void closeProtocolSession(boolean waitLast) { _ioSender.close(); _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index e3a1a82dc4..ab3ff8ecb0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -259,7 +259,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** * Called when we want to create a new IoTransport session - * @param brokerDetail + * @param brokerDetail */ public void createIoTransportSession(BrokerDetails brokerDetail) { @@ -271,7 +271,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter brokerDetail.useSSL()); _protocolSession.init(); } - + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case @@ -433,12 +433,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param e the exception to propagate * * @see #propagateExceptionToFrameListeners - * @see #propagateExceptionToStateWaiters */ public void propagateExceptionToAllWaiters(Exception e) { + getStateManager().error(e); propagateExceptionToFrameListeners(e); - propagateExceptionToStateWaiters(e); } /** @@ -469,22 +468,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } - /** - * This caters for the case where we only need to propogate an exception to the the state manager to interupt any - * thing waiting for a state change. - * - * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement. - * - * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal - * cases {@link #propagateExceptionToAllWaiters} would be the correct choice. - * - * @param e the exception to propagate - */ - public void propagateExceptionToStateWaiters(Exception e) - { - getStateManager().error(e); - } - public void notifyFailoverStarting() { // Set the last exception in the sync block to ensure the ordering with add. @@ -601,7 +584,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _protocolLogger.debug(String.format("SEND: [%s] %s", this, message)); } - + final long sentMessages = _messagesOut++; final boolean debug = _logger.isDebugEnabled(); @@ -667,7 +650,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter throw _lastFailoverException; } - if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED) + if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED || + _stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING) { Exception e = _stateManager.getLastException(); if (e != null) @@ -733,25 +717,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void closeConnection(long timeout) throws AMQException { - getStateManager().changeState(AMQState.CONNECTION_CLOSING); - ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode new AMQShortString("JMS client is closing the connection."), 0, 0); final AMQFrame frame = body.generateFrame(0); - try - { - syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _protocolSession.closeProtocolSession(); - } - catch (AMQTimeoutException e) + //If the connection is already closed then don't do a syncWrite + if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) { _protocolSession.closeProtocolSession(false); } - catch (FailoverException e) + else { - _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + try + { + syncWrite(frame, ConnectionCloseOkBody.class, timeout); + _protocolSession.closeProtocolSession(); + } + catch (AMQTimeoutException e) + { + _protocolSession.closeProtocolSession(false); + } + catch (FailoverException e) + { + _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 5e12a5e6f8..0e872170aa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -410,12 +410,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); } - public void closeProtocolSession() throws AMQException + public void closeProtocolSession() { closeProtocolSession(true); } - public void closeProtocolSession(boolean waitLast) throws AMQException + public void closeProtocolSession(boolean waitLast) { _logger.debug("Waiting for last write to join."); if (waitLast && (_lastWriteFuture != null)) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index f8645139f2..70d4697f2c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.Set; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.io.IOException; /** * The state manager is responsible for managing the state of the protocol session. <p/> @@ -86,7 +87,7 @@ public class AMQStateManager implements AMQMethodListener return _currentState; } - public void changeState(AMQState newState) throws AMQException + public void changeState(AMQState newState) { _logger.debug("State changing to " + newState + " from old state " + _currentState); @@ -136,6 +137,22 @@ public class AMQStateManager implements AMQMethodListener */ public void error(Exception error) { + if (error instanceof AMQException) + { + // AMQException should be being notified before closing the + // ProtocolSession. Which will change the State to CLOSED. + // if we have a hard error. + if (((AMQException)error).isHardError()) + { + changeState(AMQState.CONNECTION_CLOSING); + } + } + else + { + // Be on the safe side here and mark the connection closed + changeState(AMQState.CONNECTION_CLOSED); + } + if (_waiters.size() == 0) { _logger.error("No Waiters for error saving as last error:" + error.getMessage()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index bddbc329ab..ee7fc533a3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -22,10 +22,11 @@ package org.apache.qpid.client.util; import java.util.Iterator; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class FlowControllingBlockingQueue { + private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final Queue _queue = new ConcurrentLinkedQueue(); @@ -46,6 +49,8 @@ public class FlowControllingBlockingQueue /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + + private boolean disableFlowControl; public boolean isEmpty() { @@ -69,6 +74,10 @@ public class FlowControllingBlockingQueue _flowControlHighThreshold = highThreshold; _flowControlLowThreshold = lowThreshold; _listener = listener; + if (highThreshold == 0) + { + disableFlowControl = true; + } } public Object take() throws InterruptedException @@ -84,7 +93,7 @@ public class FlowControllingBlockingQueue } } } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { @@ -93,6 +102,7 @@ public class FlowControllingBlockingQueue _listener.underThreshold(_count); } } + } return o; @@ -106,7 +116,7 @@ public class FlowControllingBlockingQueue notifyAll(); } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index ce79080e97..da44822ec3 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -85,7 +85,7 @@ public class MockAMQConnection extends AMQConnection } @Override - public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException { _connected = true; _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 10ec220d9e..fc7f8148f0 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -200,15 +200,8 @@ public class AMQProtocolHandlerTest extends TestCase _handler.getStateManager().error(trigger); _logger.info("Setting state to be CONNECTION_CLOSED."); - try - { - _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - } - catch (AMQException e) - { - _logger.error("Unable to change the state to closed.", e); - fail("Unable to change the state to closed due to :"+e.getMessage()); - } + + _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); _logger.info("Firing exception"); _handler.propagateExceptionToFrameListeners(trigger); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 8df3644929..e34103a944 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -30,7 +30,6 @@ import java.nio.ByteBuffer; import javax.net.ssl.SSLEngine; -import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; @@ -59,6 +58,9 @@ import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.NetworkDriverConfiguration; import org.apache.qpid.transport.OpenException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { @@ -80,7 +82,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver private WriteFuture _lastWriteFuture; - private static final Logger _logger = Logger.getLogger(MINANetworkDriver.class); + private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class); public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) { diff --git a/qpid/java/doc/broker-priority-queue-subscription.dia b/qpid/java/doc/broker-priority-queue-subscription.dia Binary files differnew file mode 100644 index 0000000000..2289899435 --- /dev/null +++ b/qpid/java/doc/broker-priority-queue-subscription.dia diff --git a/qpid/java/doc/broker-queue-subscription.dia b/qpid/java/doc/broker-queue-subscription.dia Binary files differnew file mode 100644 index 0000000000..d146ad136d --- /dev/null +++ b/qpid/java/doc/broker-queue-subscription.dia diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java index c3348b32f0..d88e0f38bb 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java @@ -59,7 +59,12 @@ public class ClientListener implements NotificationListener else if (JMXConnectionNotification.FAILED.equals(type)) { ApplicationRegistry.serverConnectionClosed(server); - MBeanUtility.printOutput("Recieved notification from " + server.getName() + ": " + type ); + MBeanUtility.printOutput("JMX Connection to " + server.getName() + " failed."); + } + else if (JMXConnectionNotification.CLOSED.equals(type)) + { + ApplicationRegistry.serverConnectionClosed(server); + MBeanUtility.printOutput("JMX Connection to " + server.getName() + " was closed."); } } diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java index 2b459c858f..ca07e6acf4 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java @@ -22,6 +22,7 @@ package org.apache.qpid.management.ui.jmx; import static org.apache.qpid.management.ui.Constants.ALL; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -29,7 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import javax.management.ListenerNotFoundException; import javax.management.MBeanInfo; import javax.management.MBeanServerConnection; import javax.management.Notification; @@ -116,25 +116,54 @@ public class JMXServerRegistry extends ServerRegistry * removes all listeners from the mbean server. This is required when user * disconnects the Qpid server connection */ - public void closeServerConnection() throws Exception + public void closeServerConnection() throws IOException { try { + //remove the listener from the JMXConnector if (_jmxc != null && _clientListener != null) + { _jmxc.removeConnectionNotificationListener(_clientListener); + } + } + catch (Exception e) + { + //ignore + } + try + { + //remove the listener from the MBeanServerDelegate MBean if (_mbsc != null && _clientListener != null) + { _mbsc.removeNotificationListener(_serverObjectName, _clientListener); + } + } + catch (Exception e) + { + //ignore + } - // remove mbean notification listeners + if (_mbsc != null && _clientListener != null) + { + //remove any listeners from the Qpid MBeans for (String mbeanName : _subscribedNotificationMap.keySet()) { - _mbsc.removeNotificationListener(new ObjectName(mbeanName), _notificationListener); + try + { + _mbsc.removeNotificationListener(new ObjectName(mbeanName), _notificationListener); + } + catch (Exception e) + { + //ignore + } } } - catch (ListenerNotFoundException ex) + + //close the JMXConnector + if (_jmxc != null) { - MBeanUtility.printOutput(ex.toString()); + _jmxc.close(); } } diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java index 2408faae3a..f21647b2d2 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java @@ -882,7 +882,7 @@ public class AttributesTabControl extends TabControl { attribute = (AttributeData) element; if (attribute.isWritable()) - return Display.getCurrent().getSystemColor(SWT.COLOR_DARK_BLUE); + return Display.getCurrent().getSystemColor(SWT.COLOR_BLUE); else return Display.getCurrent().getSystemColor(SWT.COLOR_BLACK); } diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java index 0d290ab1c4..056d365f8e 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java @@ -790,9 +790,11 @@ public class NavigationView extends ViewPart return; } - serverRegistry.closeServerConnection(); // Add server to the closed server list and the worker thread will remove the server from required places. ApplicationRegistry.serverConnectionClosed(managedServer); + + //close the connection + serverRegistry.closeServerConnection(); } /** diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java index e981ec1c3c..f82d37dcd1 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java @@ -28,10 +28,13 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularDataSupport; +import org.apache.qpid.management.ui.ApplicationRegistry; import org.apache.qpid.management.ui.ManagedBean; +import org.apache.qpid.management.ui.ServerRegistry; import org.apache.qpid.management.common.mbeans.ManagedConnection; import org.apache.qpid.management.ui.jmx.JMXManagedObject; import org.apache.qpid.management.ui.jmx.MBeanUtility; +import org.apache.qpid.management.ui.views.MBeanView; import org.apache.qpid.management.ui.views.TabControl; import org.apache.qpid.management.ui.views.ViewUtility; import org.eclipse.jface.viewers.ISelectionChangedListener; @@ -43,6 +46,8 @@ import org.eclipse.jface.viewers.TableViewer; import org.eclipse.jface.viewers.Viewer; import org.eclipse.jface.viewers.ViewerSorter; import org.eclipse.swt.SWT; +import org.eclipse.swt.events.MouseEvent; +import org.eclipse.swt.events.MouseListener; import org.eclipse.swt.events.SelectionAdapter; import org.eclipse.swt.events.SelectionEvent; import org.eclipse.swt.graphics.Image; @@ -55,6 +60,8 @@ import org.eclipse.swt.widgets.Group; import org.eclipse.swt.widgets.TabFolder; import org.eclipse.swt.widgets.Table; import org.eclipse.swt.widgets.TableColumn; +import org.eclipse.ui.IWorkbenchWindow; +import org.eclipse.ui.PlatformUI; import org.eclipse.ui.forms.widgets.Form; import org.eclipse.ui.forms.widgets.FormToolkit; @@ -199,6 +206,8 @@ public class ConnectionOperationsTabControl extends TabControl _tableViewer.setContentProvider(new ContentProviderImpl()); _tableViewer.setLabelProvider(new LabelProviderImpl()); _tableViewer.setSorter(tableSorter); + _table.setSortColumn(_table.getColumn(0)); + _table.setSortDirection(SWT.UP); Composite buttonsComposite = _toolkit.createComposite(viewChannelsGroup); gridData = new GridData(SWT.RIGHT, SWT.BOTTOM, false, false); @@ -278,6 +287,19 @@ public class ConnectionOperationsTabControl extends TabControl } }); + //listener for double clicking to open the selection mbean + _table.addMouseListener(new MouseListener() + { + // MouseListener implementation + public void mouseDoubleClick(MouseEvent event) + { + openMBean(_table); + } + + public void mouseDown(MouseEvent e){} + public void mouseUp(MouseEvent e){} + }); + _tableViewer.addSelectionChangedListener(new ISelectionChangedListener(){ public void selectionChanged(SelectionChangedEvent evt) { @@ -465,5 +487,43 @@ public class ConnectionOperationsTabControl extends TabControl return comparison; } } + + private void openMBean(Table table) + { + int selectionIndex = table.getSelectionIndex(); + + if (selectionIndex == -1) + { + return; + } + + CompositeData channelResult = (CompositeData) table.getItem(selectionIndex).getData(); + String queueName = (String) channelResult.get(DEFAULT_QUEUE); + + if(queueName == null) + { + return; + } + + ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(_mbean); + ManagedBean selectedMBean = serverRegistry.getQueue(queueName, _mbean.getVirtualHostName()); + + if(selectedMBean == null) + { + ViewUtility.popupErrorMessage("Error", "Unable to retrieve the selected MBean to open it"); + return; + } + + IWorkbenchWindow window = PlatformUI.getWorkbench().getActiveWorkbenchWindow(); + MBeanView view = (MBeanView) window.getActivePage().findView(MBeanView.ID); + try + { + view.openMBean(selectedMBean); + } + catch (Exception ex) + { + MBeanUtility.handleException(selectedMBean, ex); + } + } } diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java index 35b3e8150c..e3dea6e96b 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java @@ -237,6 +237,8 @@ public class ExchangeOperationsTabControl extends TabControl _keysTableViewer.setContentProvider(new ContentProviderImpl(BINDING_KEY)); _keysTableViewer.setLabelProvider(new LabelProviderImpl(BINDING_KEY)); _keysTableViewer.setSorter(tableSorter); + _keysTable.setSortColumn(_keysTable.getColumn(0)); + _keysTable.setSortDirection(SWT.UP); _queuesTable = new Table (tablesComposite, SWT.SINGLE | SWT.SCROLL_LINE | SWT.BORDER | SWT.FULL_SELECTION); @@ -287,6 +289,8 @@ public class ExchangeOperationsTabControl extends TabControl _queuesTableViewer.setContentProvider(new ContentProviderImpl(QUEUES)); _queuesTableViewer.setLabelProvider(new LabelProviderImpl(QUEUES)); _queuesTableViewer.setSorter(queuesTableSorter); + _queuesTable.setSortColumn(_queuesTable.getColumn(0)); + _queuesTable.setSortDirection(SWT.UP); _queuesTableViewer.setInput(new String[]{"Select a binding key to view queues"}); //listener for double clicking to open the selection mbean diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java index 5146bab74c..fcce0e67b6 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java @@ -22,6 +22,7 @@ package org.apache.qpid.management.ui.views.exchange; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import javax.management.MBeanServerConnection; @@ -48,6 +49,7 @@ import org.eclipse.jface.viewers.TableViewer; import org.eclipse.jface.viewers.Viewer; import org.eclipse.jface.viewers.ViewerSorter; import org.eclipse.swt.SWT; +import org.eclipse.swt.custom.ScrolledComposite; import org.eclipse.swt.events.MouseEvent; import org.eclipse.swt.events.MouseListener; import org.eclipse.swt.events.SelectionAdapter; @@ -60,6 +62,7 @@ import org.eclipse.swt.widgets.Combo; import org.eclipse.swt.widgets.Composite; import org.eclipse.swt.widgets.Control; import org.eclipse.swt.widgets.Group; +import org.eclipse.swt.widgets.Label; import org.eclipse.swt.widgets.Shell; import org.eclipse.swt.widgets.TabFolder; import org.eclipse.swt.widgets.Table; @@ -182,7 +185,7 @@ public class HeadersExchangeOperationsTabControl extends TabControl final TableSorter tableSorter = new TableSorter(BINDING_NUM); String[] titles = {"Binding Number", "Queue Name"}; - int[] bounds = {125, 175}; + int[] bounds = {135, 175}; for (int i = 0; i < titles.length; i++) { final int index = i; @@ -220,6 +223,8 @@ public class HeadersExchangeOperationsTabControl extends TabControl _bindingNumberTableViewer.setContentProvider(new ContentProviderImpl(BINDING_NUM)); _bindingNumberTableViewer.setLabelProvider(new LabelProviderImpl(BINDING_NUM)); _bindingNumberTableViewer.setSorter(tableSorter); + _bindingNumberTable.setSortColumn(_bindingNumberTable.getColumn(0)); + _bindingNumberTable.setSortDirection(SWT.UP); //table of header bindings _headersTable = new Table (tablesComposite, SWT.SINGLE | SWT.SCROLL_LINE | SWT.BORDER | SWT.FULL_SELECTION); @@ -272,6 +277,8 @@ public class HeadersExchangeOperationsTabControl extends TabControl _headersTableViewer.setContentProvider(new ContentProviderImpl(HEADER_BINDINGS)); _headersTableViewer.setLabelProvider(new LabelProviderImpl(HEADER_BINDINGS)); _headersTableViewer.setSorter(queuesTableSorter); + _headersTable.setSortColumn(_headersTable.getColumn(0)); + _headersTable.setSortDirection(SWT.UP); _headersTableViewer.setInput(new String[]{"Select a binding to view key-value pairs"}); _bindingNumberTableViewer.addSelectionChangedListener(new ISelectionChangedListener(){ @@ -490,25 +497,38 @@ public class HeadersExchangeOperationsTabControl extends TabControl private void createNewBinding(Shell parent) { final Shell shell = ViewUtility.createModalDialogShell(parent, "Create New Binding"); - - Composite destinationComposite = _toolkit.createComposite(shell, SWT.NONE); - destinationComposite.setBackground(shell.getBackground()); - destinationComposite.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); - destinationComposite.setLayout(new GridLayout(2,false)); + + Composite queueNameComposite = _toolkit.createComposite(shell, SWT.NONE); + queueNameComposite.setBackground(shell.getBackground()); + GridData layoutData = new GridData(SWT.CENTER, SWT.TOP, true, false); + layoutData.minimumWidth = 300; + queueNameComposite.setLayoutData(layoutData); + queueNameComposite.setLayout(new GridLayout(2,false)); - _toolkit.createLabel(destinationComposite,"Queue:").setBackground(shell.getBackground()); - final Combo destinationCombo = new Combo(destinationComposite,SWT.NONE | SWT.READ_ONLY); + _toolkit.createLabel(queueNameComposite,"Queue:").setBackground(shell.getBackground()); + final Combo destinationCombo = new Combo(queueNameComposite,SWT.NONE | SWT.READ_ONLY); destinationCombo.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); - Composite bindingComposite = _toolkit.createComposite(shell, SWT.NONE); - bindingComposite.setBackground(shell.getBackground()); - bindingComposite.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); - bindingComposite.setLayout(new GridLayout(2,false)); + final ScrolledComposite scrolledComposite = new ScrolledComposite(shell, SWT.V_SCROLL); + scrolledComposite.setExpandHorizontal(true); + scrolledComposite.setLayout(new GridLayout()); + scrolledComposite.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + scrolledComposite.setBackground(shell.getBackground()); + + final Composite bindingComposite = _toolkit.createComposite(scrolledComposite, SWT.NONE); + bindingComposite.setBackground(scrolledComposite.getBackground()); + bindingComposite.setLayout(new GridLayout(2,true)); + layoutData = new GridData(SWT.FILL, SWT.TOP, true, false); + bindingComposite.setLayoutData(layoutData); + scrolledComposite.setContent(bindingComposite); + + Composite addMoreButtonComp = _toolkit.createComposite(shell); + addMoreButtonComp.setBackground(shell.getBackground()); + addMoreButtonComp.setLayoutData(new GridData(SWT.RIGHT, SWT.FILL, true, true)); + addMoreButtonComp.setLayout(new GridLayout()); + + final Button addMoreButton = _toolkit.createButton(addMoreButtonComp, "Add additional field", SWT.PUSH); - _toolkit.createLabel(bindingComposite,"Binding:").setBackground(shell.getBackground()); - final Text bindingText = new Text(bindingComposite, SWT.BORDER); - bindingText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); - Composite okCancelButtonsComp = _toolkit.createComposite(shell); okCancelButtonsComp.setBackground(shell.getBackground()); okCancelButtonsComp.setLayoutData(new GridData(SWT.RIGHT, SWT.FILL, true, true)); @@ -532,26 +552,106 @@ public class HeadersExchangeOperationsTabControl extends TabControl destinationCombo.setItems(queueList.toArray(new String[0])); } destinationCombo.select(0); + + final HashMap<Text, Text> headerBindingHashMap = new HashMap<Text, Text>(); + + //add headings + Label keyLabel = _toolkit.createLabel(bindingComposite,"Key:"); + keyLabel.setBackground(bindingComposite.getBackground()); + keyLabel.setLayoutData(new GridData(SWT.CENTER, SWT.TOP, true, false)); + + Label valueLabel = _toolkit.createLabel(bindingComposite,"Value:"); + valueLabel.setBackground(bindingComposite.getBackground()); + valueLabel.setLayoutData(new GridData(SWT.CENTER, SWT.TOP, true, false)); + + //add the x-match key by default and offer a comobo to select its value + final Text xmatchKeyText = new Text(bindingComposite, SWT.BORDER); + xmatchKeyText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + xmatchKeyText.setText("x-match"); + xmatchKeyText.setEditable(false); + + final Combo xmatchValueCombo = new Combo(bindingComposite,SWT.NONE | SWT.READ_ONLY); + xmatchValueCombo.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + xmatchValueCombo.setItems(new String[]{"any", "all"}); + xmatchValueCombo.select(0); + + //make some empty key-value fields + for(int i=0; i < 4; i++) + { + Text keyText = new Text(bindingComposite, SWT.BORDER); + Text valueText = new Text(bindingComposite, SWT.BORDER); + keyText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + valueText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + + headerBindingHashMap.put(keyText, valueText); + } + bindingComposite.setSize(bindingComposite.computeSize(SWT.DEFAULT, SWT.DEFAULT)); + + //allow adding more fields for additional key-value pairs + addMoreButton.addSelectionListener(new SelectionAdapter() + { + public void widgetSelected(SelectionEvent e) + { + Text keyText = new Text(bindingComposite, SWT.BORDER); + Text valueText = new Text(bindingComposite, SWT.BORDER); + keyText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + valueText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + + headerBindingHashMap.put(keyText, valueText); + + bindingComposite.setSize(bindingComposite.computeSize(SWT.DEFAULT, SWT.DEFAULT)); + bindingComposite.layout(true); + scrolledComposite.layout(true); + } + }); okButton.addSelectionListener(new SelectionAdapter() { public void widgetSelected(SelectionEvent e) { - String binding = bindingText.getText(); + String xMatchString = xmatchValueCombo.getText(); - if (binding == null || binding.length() == 0) + String destQueue = destinationCombo.getItem(destinationCombo.getSelectionIndex()).toString(); + + StringBuffer bindingValue = new StringBuffer(); + + //insert the x-match key-value pair + if (xMatchString.equalsIgnoreCase("any")) { - ViewUtility.popupErrorMessage("Create New Binding", "Please enter a valid binding"); - return; + bindingValue.append("x-match=any"); + } + else + { + bindingValue.append("x-match=all"); } - String destQueue = destinationCombo.getItem(destinationCombo.getSelectionIndex()).toString(); + //insert the other key-value pairs + for (Text keyText : headerBindingHashMap.keySet()) + { + + String key = keyText.getText(); + if(key == null || key.length() == 0) + { + continue; + } + + Text valueText = headerBindingHashMap.get(keyText); + String value = valueText.getText(); + + bindingValue.append(","); + bindingValue.append(key + "="); + //empty values are permitted, signalling only key-presence is required + if(value != null && value.length() > 0) + { + bindingValue.append(value); + } + } shell.dispose(); try { - _emb.createNewBinding(destQueue, binding); + _emb.createNewBinding(destQueue, bindingValue.toString()); ViewUtility.operationResultFeedback(null, "Created new Binding", null); } catch (Exception e4) diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java index 3e8a5da5b5..1b1d08aa67 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java @@ -197,8 +197,8 @@ public class ConfigurationFileTabControl extends TabControl "NOTE: These options modify the configuration file. " + "Changes only take effect automatically if LogWatch is enabled."); Label noteLabel2 = _toolkit.createLabel(_headerComposite, - "A Logger set to a non-inherited Level in the Runtime tab " + - "will retain that value after the configuration is reloaded."); + "A child Logger set to a non-inherited Level in the Runtime tab " + + "will retain that value after the file is reloaded."); GridData gridData = new GridData(SWT.FILL, SWT.FILL, false, true); noteLabel.setLayoutData(gridData); gridData = new GridData(SWT.FILL, SWT.FILL, false, true); diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java index c7c7a1791a..43d2cfe204 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java @@ -299,8 +299,8 @@ public class QueueOperationsTabControl extends TabControl //message table Composite tableAndButtonsComposite = _toolkit.createComposite(messagesGroup); gridData = new GridData(SWT.FILL, SWT.FILL, true, true); - gridData.minimumHeight = 220; - gridData.heightHint = 220; + gridData.minimumHeight = 180; + gridData.heightHint = 180; tableAndButtonsComposite.setLayoutData(gridData); tableAndButtonsComposite.setLayout(new GridLayout(2,false)); @@ -358,6 +358,8 @@ public class QueueOperationsTabControl extends TabControl _tableViewer.setContentProvider(new ContentProviderImpl()); _tableViewer.setLabelProvider(new LabelProviderImpl()); _tableViewer.setSorter(tableSorter); + _table.setSortColumn(_table.getColumn(0)); + _table.setSortDirection(SWT.UP); //Side Buttons Composite buttonsComposite = _toolkit.createComposite(tableAndButtonsComposite); @@ -492,12 +494,12 @@ public class QueueOperationsTabControl extends TabControl headerEtcComposite.setLayoutData(gridData); headerEtcComposite.setLayout(new GridLayout()); - final Text headerText = new Text(headerEtcComposite, SWT.WRAP | SWT.BORDER ); + final Text headerText = new Text(headerEtcComposite, SWT.WRAP | SWT.BORDER | SWT.V_SCROLL); headerText.setText("Select a message to view its header."); headerText.setEditable(false); data = new GridData(SWT.LEFT, SWT.TOP, false, false); - data.minimumHeight = 230; - data.heightHint = 230; + data.minimumHeight = 210; + data.heightHint = 210; data.minimumWidth = 500; data.widthHint = 500; headerText.setLayoutData(data); @@ -570,10 +572,16 @@ public class QueueOperationsTabControl extends TabControl String[] msgHeader = (String[]) selectedMsg.get(MSG_HEADER); headerText.setText(""); String lineSeperator = System.getProperty("line.separator"); - for(String s: msgHeader) + int size = msgHeader.length; + for(int i=0; i < size; i++) { - headerText.append(s + lineSeperator); + headerText.append(msgHeader[i]); + if(!(i == size - 1)) + { + headerText.append(lineSeperator); + } } + headerText.setSelection(0); } if (_table.getSelectionCount() > 1) diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java index fef1e9f887..3b03aeaff1 100644 --- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java +++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java @@ -205,6 +205,8 @@ public class VHostTabControl extends TabControl _queueTableViewer.setContentProvider(new ContentProviderImpl()); _queueTableViewer.setLabelProvider(new LabelProviderImpl()); _queueTableViewer.setSorter(tableSorter); + _queueTable.setSortColumn(_queueTable.getColumn(0)); + _queueTable.setSortDirection(SWT.UP); Composite queuesRightComposite = _toolkit.createComposite(queuesGroup); gridData = new GridData(SWT.FILL, SWT.FILL, false, true); @@ -314,6 +316,8 @@ public class VHostTabControl extends TabControl _exchangeTableViewer.setContentProvider(new ContentProviderImpl()); _exchangeTableViewer.setLabelProvider(new LabelProviderImpl()); _exchangeTableViewer.setSorter(exchangeTableSorter); + _exchangeTable.setSortColumn(_exchangeTable.getColumn(0)); + _exchangeTable.setSortDirection(SWT.UP); Composite exchangesRightComposite = _toolkit.createComposite(exchangesGroup); gridData = new GridData(SWT.FILL, SWT.FILL, false, true); diff --git a/qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini b/qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini index 9e3de042d5..312580769e 100644 --- a/qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini +++ b/qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini @@ -23,14 +23,14 @@ -XX:MaxPermSize=256m
-Dosgi.requiredJavaVersion=1.5
-Declipse.consoleLog=true
- -#=============================================== -# SSL trust store configuration options. -#=============================================== - -# Uncomment lines below to specify custom truststore for server SSL -# certificate verification, eg when using self-signed server certs. -# -#-Djavax.net.ssl.trustStore=<path.to.truststore> -#-Djavax.net.ssl.trustStorePassword=<truststore.password> - +
+#===============================================
+# SSL trust store configuration options.
+#===============================================
+
+# Uncomment lines below to specify custom truststore for server SSL
+# certificate verification, eg when using self-signed server certs.
+#
+#-Djavax.net.ssl.trustStore=<path.to.truststore>
+#-Djavax.net.ssl.trustStorePassword=<truststore.password>
+
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java index 7a2266902b..b4ba6e8156 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java @@ -29,6 +29,8 @@ import org.apache.qpid.server.logging.AbstractTestLogging; import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject; import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanServerConnection; @@ -38,6 +40,8 @@ import javax.management.remote.JMXConnector; import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * Test class to test if any change in the broker JMX code is affesting the management console @@ -161,6 +165,23 @@ public class ManagementActorLoggingTest extends AbstractTestLogging //Create a connection to the broker Connection connection = getConnection(); + // Monitor the connection for an exception being thrown + // this should be a DisconnectionException but it is not this tests + // job to valiate that. Only use the exception as a synchronisation + // to check the log file for the Close message + final CountDownLatch exceptionReceived = new CountDownLatch(1); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + //Failover being attempted. + exceptionReceived.countDown(); + } + }); + + //Remove the connection close from any 0-10 connections + _monitor.reset(); + // Get all active AMQP connections AllObjects allObject = new AllObjects(_mbsc); allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,*"; @@ -175,16 +196,17 @@ public class ManagementActorLoggingTest extends AbstractTestLogging newProxyInstance(_mbsc, connectionName, ManagedConnection.class, false); - //Remove the connection close from any 0-10 connections - _monitor.reset(); //Close the connection mangedConnection.closeConnection(); + //Wait for the connection to close + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); + //Validate results List<String> results = _monitor.findMatches("CON-1002"); - assertEquals("Unexpected Connection Close count", 1, results.size()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java index d7209c5660..5dd56fb0f9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java @@ -29,6 +29,7 @@ import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; +import javax.jms.Message; import java.io.IOException; import java.util.List; @@ -327,22 +328,45 @@ public class SubscriptionLoggingTest extends AbstractTestLogging int PREFETCH = 15; //Create new session with small prefetch - _session = ((AMQConnection) _connection).createSession(true, Session.AUTO_ACKNOWLEDGE, PREFETCH); + _session = ((AMQConnection) _connection).createSession(true, Session.SESSION_TRANSACTED, PREFETCH); MessageConsumer consumer = _session.createConsumer(_queue); _connection.start(); + //Start the dispatcher & Unflow the channel. + consumer.receiveNoWait(); + //Fill the prefetch and two extra so that our receive bellow allows the - // subscription to become active then return to a suspended state. - sendMessage(_session, _queue, 17); + // subscription to become active + // Previously we set this to 17 so that it would return to a suspended + // state. However, testing has shown that the state change can occur + // sufficiently quickly that logging does not occur consistently enough + // for testing. + int SEND_COUNT = 16; + sendMessage(_session, _queue, SEND_COUNT); _session.commit(); // Retreive the first message, and start the flow of messages - assertNotNull("First message not retreived", consumer.receive(1000)); + Message msg = consumer.receive(1000); + assertNotNull("First message not retreived", msg); _session.commit(); - - _connection.close(); + // Drain the queue to ensure there is time for the ACTIVE log message + // Check that we can received all the messages + int receivedCount = 0; + while (msg != null) + { + receivedCount++; + msg = consumer.receive(1000); + _session.commit(); + } + + //Validate we received all the messages + assertEquals("Not all sent messages received.", SEND_COUNT, receivedCount); + + // Fill the queue again to suspend the consumer + sendMessage(_session, _queue, SEND_COUNT); + _session.commit(); //Validate List<String> results = _monitor.findMatches("SUB-1003"); @@ -350,15 +374,13 @@ public class SubscriptionLoggingTest extends AbstractTestLogging try { // Validation expects three messages. - // The first will be logged by the QueueActor as part of the processQueue thread -// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED - // The second will be by the connnection as it acknowledges and activates the subscription -// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE - // The final one can be the subscription suspending as part of the SubFlushRunner or the processQueue thread - // As a result validating the actor is more complicated and doesn't add anything. The goal of this test is - // to ensure the State is correct not that a particular Actor performs the logging. -// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED -// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED + // The Actor can be any one of the following depending on the exactly what is going on on the broker. + // Ideally we would test that we can get all of them but setting up + // the timing to do this in a consistent way is not benefitial. + // Ensuring the State is as expected is sufficient. +// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : +// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : +// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : assertEquals("Result set not expected size:", 3, results.size()); @@ -367,19 +389,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging String log = getLog(results.get(0)); validateSubscriptionState(log, expectedState); - // Validate that the logActor is the the queue - String actor = fromActor(log); - assertTrue("Actor string does not contain expected queue(" - + _queue.getQueueName() + ") name." + actor, - actor.contains("qu(" + _queue.getQueueName() + ")")); - // After being suspended the subscription should become active. expectedState = "ACTIVE"; log = getLog(results.get(1)); validateSubscriptionState(log, expectedState); - // Validate we have a connection Actor - actor = fromActor(log); - assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:")); // Validate that it was re-suspended expectedState = "SUSPENDED"; @@ -396,6 +409,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging } throw afe; } + _connection.close(); + + //Ensure the queue is drained before the test ends + drainQueue(_queue); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java new file mode 100644 index 0000000000..c9810e7304 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import org.apache.qpid.test.utils.QpidTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * QPID-155 + * + * Test to validate that setting the respective qpid.declare_queues, + * qpid.declare_exchanges system properties functions as expected. + * + */ +public class DynamicQueueExchangeCreateTest extends QpidTestCase +{ + + public void testQueueDeclare() throws Exception + { + setSystemProperty("qpid.declare_queues", "false"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(getTestQueueName()); + + try + { + session.createConsumer(queue); + fail("JMSException should be thrown as the queue does not exist"); + } + catch (JMSException e) + { + assertTrue("Exception should be that the queue does not exist :" + + e.getMessage(), + e.getMessage().contains("does not exist")); + + } + } + + public void testExchangeDeclare() throws Exception + { + setSystemProperty("qpid.declare_exchanges", "false"); + + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String EXCHANGE_TYPE = "test.direct"; + Queue queue = session.createQueue("direct://" + EXCHANGE_TYPE + "/queue/queue"); + + try + { + session.createConsumer(queue); + fail("JMSException should be thrown as the exchange does not exist"); + } + catch (JMSException e) + { + assertTrue("Exception should be that the exchange does not exist :" + + e.getMessage(), + e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist")); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java new file mode 100644 index 0000000000..3fb6cd3526 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.close; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.test.utils.QpidTestCase; + +import javax.jms.Session; + +/** QPID-1809 + * + * Race condition on error handling and close logic. + * + * See most often with SimpleACLTest as this test is the expects the server to + * shut the connection/channels. This sort of testing is not performed by many, + * if any, of the other system tests. + * + * The problem is that we have two threads + * + * MainThread Exception(Mina)Thread + * | | + * Performs | + * ACtion | + * | Receives Server + * | Close + * Blocks for | + * Response | + * | Starts To Notify + * | client + * | | + * | <----- Notify Main Thread + * Notification | + * wakes client | + * | | + * Client then | + * processes Error. | + * | | + * Potentially Attempting Close Channel/Connection + * Connection Close + * + * The two threads both attempt to close the connection but the main thread does + * so assuming that the connection is open and valid. + * + * The Exception thread must modify the connection so that no furter syncWait + * commands are performed. + * + * This test sends an ExchangeDeclare that is Asynchronous and will fail and + * so cause a ChannelClose error but we perform a syncWait so that we can be + * sure to test that the BlockingWaiter is correctly awoken. + * + */ +public class JavaServerCloseRaceConditionTest extends QpidTestCase +{ + private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup"; + + public void test() throws Exception + { + + AMQConnection connection = (AMQConnection) getConnection(); + + AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set no wait true so that we block the connection + // Also set a different exchange class string so the attempt to declare + // the exchange causes an exchange. + ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null, + true, false, false, false, true, null); + + AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId()); + + try + { + // block our thread so that can times out + connection.getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + catch (Exception e) + { + assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME)); + } + + try + { + // Depending on if the notification thread has closed the connection + // or not we may get an exception here when we attempt to close the + // connection. If we do get one then it should be the same as above + // an AMQAuthenticationException. + connection.close(); + } + catch (Exception e) + { + assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME)); + } + + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index c5cdb83bbf..2a44413ac8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -51,7 +51,13 @@ import javax.jms.TopicSubscriber; public class DurableSubscriptionTest extends QpidTestCase { private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class); - + + /** Timeout for receive() if we are expecting a message */ + private static final long POSITIVE_RECEIVE_TIMEOUT = 2000; + + /** Timeout for receive() if we are not expecting a message */ + private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000; + public void testUnsubscribe() throws Exception { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); @@ -76,16 +82,18 @@ public class DurableSubscriptionTest extends QpidTestCase Message msg; _logger.info("Receive message on consumer 1:expecting A"); - msg = consumer1.receive(); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); - _logger.info("Receive message on consumer 1:expecting A"); - msg = consumer2.receive(); + _logger.info("Receive message on consumer 2:expecting A"); + msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer2.receive(1000); + msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); @@ -96,14 +104,15 @@ public class DurableSubscriptionTest extends QpidTestCase producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); - msg = consumer1.receive(); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); _logger.info("Receive message on consumer 2 :expecting null"); - msg = consumer2.receive(1000); + msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); _logger.info("Close connection"); @@ -143,14 +152,16 @@ public class DurableSubscriptionTest extends QpidTestCase producer.send(session1.createTextMessage("A")); Message msg; - msg = consumer1.receive(); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); - msg = consumer2.receive(); + msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer2.receive(1000); + msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); consumer2.close(); @@ -220,8 +231,8 @@ public class DurableSubscriptionTest extends QpidTestCase msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); - msg = consumer2.receive(); - assertNotNull(msg); + msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Message should have been received",msg); assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); msg = consumer2.receive(500); assertNull("There should be no more messages for consumption on consumer2.", msg); @@ -235,10 +246,10 @@ public class DurableSubscriptionTest extends QpidTestCase producer.send(session0.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals("B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); - msg = consumer1.receive(1000); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed. @@ -296,7 +307,7 @@ public class DurableSubscriptionTest extends QpidTestCase producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); - Message msg = liveSubscriber.receive(); + Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull ("Message should have been received", msg); assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); @@ -331,7 +342,7 @@ public class DurableSubscriptionTest extends QpidTestCase assertNotNull("Subscriber should have been created", liveSubscriber); producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); - Message msg = liveSubscriber.receive(); + Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull ("Message should have been received", msg); assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); @@ -360,13 +371,13 @@ public class DurableSubscriptionTest extends QpidTestCase // Send 1 matching message and 1 non-matching message sendMatchingAndNonMatchingMessage(session, producer); - Message rMsg = subA.receive(1000); + Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelector1", ((TextMessage) rMsg).getText()); - rMsg = subA.receive(1000); + rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull(rMsg); // Disconnect subscriber @@ -379,13 +390,13 @@ public class DurableSubscriptionTest extends QpidTestCase // Check messages are recieved properly sendMatchingAndNonMatchingMessage(session, producer); - rMsg = subB.receive(1000); + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelector2", ((TextMessage) rMsg).getText()); - rMsg = subB.receive(1000); + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull(rMsg); session.unsubscribe("testResubscribeWithChangedSelector"); } @@ -429,5 +440,5 @@ public class DurableSubscriptionTest extends QpidTestCase public static junit.framework.Test suite() { return new junit.framework.TestSuite(DurableSubscriptionTest.class); - } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 9c755fcb41..b603455644 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -479,7 +479,7 @@ public class CommitRollbackTest extends QpidTestCase _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); _pubSession.commit(); - assertNotNull(_consumer.receive(100)); + assertNotNull(_consumer.receive(1000)); _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index cc9cfce34b..1bef07fcd5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -55,7 +55,7 @@ public class FailoverBaseCase extends QpidTestCase { super.setUp(); setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort()); - startBroker(FAILING_PORT); + startBroker(failingPort); } /** @@ -76,7 +76,7 @@ public class FailoverBaseCase extends QpidTestCase public void tearDown() throws Exception { - stopBroker(FAILING_PORT); + stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT); super.tearDown(); FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort()); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java index df8dd0b85b..44ac5b4838 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java @@ -31,6 +31,7 @@ import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.OutputStreamWriter; +import java.util.ArrayList; import java.util.List; /** @@ -118,7 +119,7 @@ public class LogMonitor * @throws java.io.FileNotFoundException if the Log file can nolonger be found * @throws IOException thrown when reading the log file */ - public boolean waitForMessage(String message, long wait) + public boolean waitForMessage(String message, long wait, boolean printFileOnFailure) throws FileNotFoundException, IOException { // Loop through alerts until we're done or wait ms seconds have passed, @@ -126,20 +127,35 @@ public class LogMonitor BufferedReader reader = new BufferedReader(new FileReader(_logfile)); boolean found = false; long endtime = System.currentTimeMillis() + wait; + ArrayList<String> contents = new ArrayList<String>(); while (!found && System.currentTimeMillis() < endtime) { while (reader.ready()) { String line = reader.readLine(); + contents.add(line); if (line.contains(message)) { found = true; } } } - + if (!found && printFileOnFailure) + { + for (String line : contents) + { + System.out.println(line); + } + } return found; } + + + public boolean waitForMessage(String messageCountAlert, long alertLogWaitPeriod) throws FileNotFoundException, IOException + { + return waitForMessage(messageCountAlert, alertLogWaitPeriod, true); + } + /** * Read the log file in to memory as a String diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java index d1a0df30a9..2b9fe8e039 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java @@ -30,25 +30,25 @@ import java.util.List; public class LogMonitorTest extends TestCase { + private LogMonitor _monitor; + + @Override + public void setUp() throws Exception + { + _monitor = new LogMonitor(); + _monitor.getMonitoredFile().deleteOnExit(); // Make sure we clean up + } + /** * Test that a new file is created when attempting to set up a monitor with * the default constructor. */ public void testMonitor() { - // Validate that a NPE is thrown with null input - try - { - LogMonitor montior = new LogMonitor(); - //Validte that the monitor is now running on a new file - assertTrue("New file does not have correct name:" + montior. - getMonitoredFile().getName(), - montior.getMonitoredFile().getName().contains("LogMonitor")); - } - catch (IOException ioe) - { - fail("IOE thrown:" + ioe); - } + //Validate that the monitor is now running on a new file + assertTrue("New file does not have correct name:" + _monitor. + getMonitoredFile().getName(), + _monitor.getMonitoredFile().getName().contains("LogMonitor")); } /** @@ -63,13 +63,11 @@ public class LogMonitorTest extends TestCase File testFile = File.createTempFile("testMonitorFile", ".log"); testFile.deleteOnExit(); - LogMonitor monitor; - //Ensure that we can create a monitor on a file try { - monitor = new LogMonitor(testFile); - assertEquals(testFile, monitor.getMonitoredFile()); + _monitor = new LogMonitor(testFile); + assertEquals(testFile, _monitor.getMonitoredFile()); } catch (IOException ioe) { @@ -136,13 +134,12 @@ public class LogMonitorTest extends TestCase */ public void testFindMatches_Match() throws IOException { - LogMonitor monitor = new LogMonitor(); String message = getName() + ": Test Message"; Logger.getRootLogger().warn(message); - validateLogContainsMessage(monitor, message); + validateLogContainsMessage(_monitor, message); } /** @@ -152,35 +149,17 @@ public class LogMonitorTest extends TestCase */ public void testFindMatches_NoMatch() throws IOException { - LogMonitor monitor = new LogMonitor(); - String message = getName() + ": Test Message"; Logger.getRootLogger().warn(message); String notLogged = "This text was not logged"; - validateLogDoesNotContainsMessage(monitor, notLogged); - } - - public void testWaitForMessage_Found() throws IOException - { - LogMonitor monitor = new LogMonitor(); - - String message = getName() + ": Test Message"; - - long TIME_OUT = 2000; - - logMessageWithDelay(message, TIME_OUT / 2); - - assertTrue("Message was not logged ", - monitor.waitForMessage(message, TIME_OUT)); + validateLogDoesNotContainsMessage(_monitor, notLogged); } public void testWaitForMessage_Timeout() throws IOException { - LogMonitor monitor = new LogMonitor(); - String message = getName() + ": Test Message"; long TIME_OUT = 2000; @@ -189,41 +168,37 @@ public class LogMonitorTest extends TestCase // Verify that we can time out waiting for a message assertFalse("Message was logged ", - monitor.waitForMessage(message, TIME_OUT / 2)); + _monitor.waitForMessage(message, TIME_OUT / 2, false)); // Verify that the message did eventually get logged. assertTrue("Message was never logged.", - monitor.waitForMessage(message, TIME_OUT)); + _monitor.waitForMessage(message, TIME_OUT)); } public void testReset() throws IOException { - LogMonitor monitor = new LogMonitor(); - String message = getName() + ": Test Message"; Logger.getRootLogger().warn(message); - validateLogContainsMessage(monitor, message); + validateLogContainsMessage(_monitor, message); String LOG_RESET_TEXT = "Log Monitor Reset"; - validateLogDoesNotContainsMessage(monitor, LOG_RESET_TEXT); + validateLogDoesNotContainsMessage(_monitor, LOG_RESET_TEXT); - monitor.reset(); + _monitor.reset(); - assertEquals("", monitor.readFile()); + assertEquals("", _monitor.readFile()); } public void testRead() throws IOException { - LogMonitor monitor = new LogMonitor(); - String message = getName() + ": Test Message"; Logger.getRootLogger().warn(message); - String fileContents = monitor.readFile(); + String fileContents = _monitor.readFile(); assertTrue("Logged message not found when reading file.", fileContents.contains(message)); diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes index 7577ad8da1..36e7317e31 100644 --- a/qpid/java/test-profiles/010Excludes +++ b/qpid/java/test-profiles/010Excludes @@ -4,6 +4,7 @@ org.apache.qpid.client.ResetMessageListenerTest#* //These tests are for the java broker org.apache.qpid.server.security.acl.SimpleACLTest#* org.apache.qpid.server.plugins.PluginTest#* +org.apache.qpid.server.BrokerStartupTest#* // This test is not finished org.apache.qpid.test.testcases.TTLTest#* @@ -82,3 +83,9 @@ org.apache.qpid.server.logging.* // CPP Broker does not have a JMX interface to test org.apache.qpid.management.jmx.* + +// 0-10 is not supported by the MethodRegistry +org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#* + +// QPID-2084 : this test needs more work for 0-10 +org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#* |