summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker/bin/qpid-server9
-rw-r--r--qpid/java/broker/etc/log4j.xml2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallFactory.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java51
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java24
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java91
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java54
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java18
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java6
-rw-r--r--qpid/java/doc/broker-priority-queue-subscription.diabin0 -> 2991 bytes
-rw-r--r--qpid/java/doc/broker-queue-subscription.diabin0 -> 2129 bytes
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java7
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java41
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java2
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java4
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java60
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java4
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java144
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java4
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java22
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java4
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java28
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java65
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java88
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java119
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java59
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java20
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java73
-rw-r--r--qpid/java/test-profiles/010Excludes7
46 files changed, 816 insertions, 353 deletions
diff --git a/qpid/java/broker/bin/qpid-server b/qpid/java/broker/bin/qpid-server
index e5a9e998e2..738fc6e084 100755
--- a/qpid/java/broker/bin/qpid-server
+++ b/qpid/java/broker/bin/qpid-server
@@ -26,14 +26,19 @@ fi
# Set classpath to include Qpid jar with all required jars in manifest
QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/bdbstore-launch.jar
+# Default Log4j to append to its log file
+if [ -z "$QPID_LOG_APPEND" ]; then
+ export QPID_LOG_APPEND="true"
+fi
+
# Set other variables used by the qpid-run script before calling
export JAVA=java \
JAVA_VM=-server \
JAVA_MEM=-Xmx1024m \
JAVA_GC="-XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \
QPID_CLASSPATH=$QPID_LIBS \
- QPID_RUN_LOG=2
+ QPID_RUN_LOG=2
-QPID_OPTS="$QPID_OPTS -Damqj.read_write_pool_size=32"
+QPID_OPTS="$QPID_OPTS -Damqj.read_write_pool_size=32 -DQPID_LOG_APPEND=$QPID_LOG_APPEND"
. qpid-run org.apache.qpid.server.Main "$@"
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index a395d0fd56..8ca43ededd 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -50,7 +50,7 @@
<appender class="org.apache.log4j.FileAppender" name="FileAppender">
<param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/>
- <param name="Append" value="false"/>
+ <param name="Append" value="${QPID_LOG_APPEND}"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index fc667db17b..c5f5cd05e1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -204,11 +204,21 @@ public class HeadersExchange extends AbstractExchange
for (int i = 0; i < bindings.length; i++)
{
String[] keyAndValue = bindings[i].split("=");
- if (keyAndValue == null || keyAndValue.length < 2)
+ if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2)
{
throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
}
- bindingMap.setString(keyAndValue[0], keyAndValue[1]);
+
+ if(keyAndValue.length ==1)
+ {
+ //no value was given, only a key. Use an empty value
+ //to signal match on key presence alone
+ bindingMap.setString(keyAndValue[0], "");
+ }
+ else
+ {
+ bindingMap.setString(keyAndValue[0], keyAndValue[1]);
+ }
}
_bindings.add(new Registration(new HeadersBinding(bindingMap), queue));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java
index 0dffde50fa..bfb122985b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/StartupRootMessageLogger.java
@@ -39,4 +39,11 @@ public class StartupRootMessageLogger extends RootMessageLoggerImpl
return true;
}
+ @Override
+ public boolean isMessageEnabled(LogActor actor)
+ {
+ return true;
+ }
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
index 42b3b05ac5..aea9ab43ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
@@ -379,6 +379,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
try
{
_cs.stop();
+ CurrentActor.get().message(ManagementConsoleMessages.MNG_1003("RMI Registry", _cs.getAddress().getPort() - PORT_EXPORT_OFFSET));
+ CurrentActor.get().message(ManagementConsoleMessages.MNG_1003("RMI ConnectorServer", _cs.getAddress().getPort()));
}
catch (IOException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
index f7e537b02b..a6fae053c2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
@@ -120,10 +120,4 @@ public abstract class BasicACLPlugin implements ACLPlugin
// no-op
}
- public boolean supportsTag(String name)
- {
- // This plugin doesn't support any tags
- return false;
- }
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallFactory.java
deleted file mode 100644
index a1a399e5bf..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.security.access.plugins.network;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.security.access.ACLPlugin;
-import org.apache.qpid.server.security.access.ACLPluginFactory;
-
-public class FirewallFactory implements ACLPluginFactory
-{
-
- @Override
- public ACLPlugin newInstance(Configuration config) throws ConfigurationException
- {
- FirewallPlugin plugin = new FirewallPlugin();
- plugin.setConfiguration(config);
- return plugin;
- }
-
- @Override
- public boolean supportsTag(String name)
- {
- return name.equals("firewall");
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index b34ef1c382..72d6afc65c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -599,7 +599,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
{
_stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
}
else
{
@@ -612,9 +611,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
{
_stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
}
}
+ CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
}
public State getState()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
index f62b0c6241..317dee2b47 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
@@ -46,17 +46,4 @@ public class ExchangeDenier extends AllowAll
{
return AuthzResult.DENIED;
}
-
- @Override
- public String getPluginName()
- {
- return getClass().getSimpleName();
- }
-
- @Override
- public boolean supportsTag(String name)
- {
- return name.equals("exchangeDenier");
- }
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
index 05ac3dca9e..6bae0166d1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
@@ -39,9 +39,4 @@ public class AMQAuthenticationException extends AMQException
{
super(error, msg, cause);
}
- public boolean isHardError()
- {
- return true;
- }
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 118be75705..2e3e417c95 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -65,6 +65,7 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -205,9 +206,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
- protected static final boolean DECLARE_QUEUES =
+ protected final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
- protected static final boolean DECLARE_EXCHANGES =
+ protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
/** System property to enable strict AMQP compliance. */
@@ -629,6 +630,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public void close(long timeout) throws JMSException
{
+ close(timeout, true);
+ }
+
+ private void close(long timeout, boolean sendClose) throws JMSException
+ {
if (_logger.isInfoEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -654,9 +660,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// If the connection is open or we are in the process
// of closing the connection then send a cance
// no point otherwise as the connection will be gone
- if (!_connection.isClosed() || _connection.isClosing())
+ if (!_connection.isClosed() || _connection.isClosing())
{
- sendClose(timeout);
+ if (sendClose)
+ {
+ sendClose(timeout);
+ }
}
}
catch (AMQException e)
@@ -712,25 +721,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!_closed.getAndSet(true))
{
- synchronized (getFailoverMutex())
+ synchronized (_messageDeliveryLock)
{
- synchronized (_messageDeliveryLock)
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ AMQException amqe;
+ if (e instanceof AMQException)
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException("Closing session forcibly", e);
- }
-
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
}
+
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
}
}
}
@@ -1737,6 +1743,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
+ if (e instanceof AMQChannelClosedException)
+ {
+ close(-1, false);
+ }
+
JMSException ex = new JMSException("Error registering consumer: " + e);
ex.setLinkedException(e);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index fa4e08f62b..d7196c0abb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -33,6 +33,7 @@ import org.apache.qpid.client.message.*;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
@@ -116,12 +117,23 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public void sendClose(long timeout) throws AMQException, FailoverException
{
- getProtocolHandler().closeSession(this);
- getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
- ChannelCloseOkBody.class, timeout);
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully.
+ // we also need to check the state manager for 08/09 as the
+ // _connection variable may not be updated in time by the error receiving
+ // thread.
+ // We can't close the session if we are alreadying in the process of
+ // closing/closed the connection.
+
+ if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
+ || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING)))
+ {
+
+ getProtocolHandler().closeSession(this);
+ getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
+ ChannelCloseOkBody.class, timeout);
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully.
+ }
}
public void sendCommit() throws AMQException, FailoverException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 2b6745ebe4..2cf19bf391 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -32,7 +32,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +47,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
}
public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
- throws AMQException
+ throws AMQException
{
_logger.debug("ChannelClose method received");
@@ -59,52 +58,62 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
-
-
ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody();
AMQFrame frame = body.generateFrame(channelId);
session.writeFrame(frame);
-
- if (errorCode != AMQConstant.REPLY_SUCCESS)
+ try
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
- }
-
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- throw new AMQNoConsumersException("Error: " + reason, null, null);
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- throw new AMQNoRouteException("Error: " + reason, null, null);
- }
- else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- _logger.debug("Broker responded with Invalid Argument.");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ }
+
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ throw new AMQNoConsumersException("Error: " + reason, null, null);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ throw new AMQNoRouteException("Error: " + reason, null, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ {
+ _logger.debug("Broker responded with Invalid Argument.");
+
+ throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
+ }
+ else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+ {
+ _logger.debug("Broker responded with Invalid Routing Key.");
+
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
+ }
+ else
+ {
+
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
+ }
- throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
- }
- else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
- {
- _logger.debug("Broker responded with Invalid Routing Key.");
-
- throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
- }
- else
- {
- throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
}
-
}
- // fixme why is this only done when the close is expected...
- // should the above forced closes not also cause a close?
- // ----------
- // Closing the session only when it is expected allows the errors to be processed
- // Calling this here will prevent failover. So we should do this for all exceptions
- // that should never cause failover. Such as authentication errors.
-
- session.channelClosed(channelId, errorCode, String.valueOf(reason));
+ finally
+ {
+ // fixme why is this only done when the close is expected...
+ // should the above forced closes not also cause a close?
+ // ----------
+ // Closing the session only when it is expected allows the errors to be processed
+ // Calling this here will prevent failover. So we should do this for all exceptions
+ // that should never cause failover. Such as authentication errors.
+ // ----
+ // 2009-09-07 - ritchiem
+ // calling channelClosed will only close this session and will not
+ // prevent failover. If we don't close the session here then we will
+ // have problems during the session close as it will attempt to
+ // close the session that the broker has closed,
+
+ session.channelClosed(channelId, errorCode, String.valueOf(reason));
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
index e639a33450..e40cafd72f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
@@ -40,7 +40,6 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<C
}
public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId)
- throws AMQException
{
session.getStateManager().changeState(AMQState.CONNECTION_OPEN);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index e4e58c317d..287b5957a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -45,7 +45,6 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con
{ }
public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId)
- throws AMQException
{
_logger.debug("ConnectionTune frame received");
final MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
index f2aca58deb..8782e00a12 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
@@ -52,7 +52,7 @@ public class AMQIoTransportProtocolSession extends AMQProtocolSession
}
@Override
- public void closeProtocolSession(boolean waitLast) throws AMQException
+ public void closeProtocolSession(boolean waitLast)
{
_ioSender.close();
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index e3a1a82dc4..ab3ff8ecb0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -259,7 +259,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/**
* Called when we want to create a new IoTransport session
- * @param brokerDetail
+ * @param brokerDetail
*/
public void createIoTransportSession(BrokerDetails brokerDetail)
{
@@ -271,7 +271,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
brokerDetail.useSSL());
_protocolSession.init();
}
-
+
/**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
@@ -433,12 +433,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param e the exception to propagate
*
* @see #propagateExceptionToFrameListeners
- * @see #propagateExceptionToStateWaiters
*/
public void propagateExceptionToAllWaiters(Exception e)
{
+ getStateManager().error(e);
propagateExceptionToFrameListeners(e);
- propagateExceptionToStateWaiters(e);
}
/**
@@ -469,22 +468,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
- /**
- * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
- * thing waiting for a state change.
- *
- * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
- *
- * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
- * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
- *
- * @param e the exception to propagate
- */
- public void propagateExceptionToStateWaiters(Exception e)
- {
- getStateManager().error(e);
- }
-
public void notifyFailoverStarting()
{
// Set the last exception in the sync block to ensure the ordering with add.
@@ -601,7 +584,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
}
-
+
final long sentMessages = _messagesOut++;
final boolean debug = _logger.isDebugEnabled();
@@ -667,7 +650,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
throw _lastFailoverException;
}
- if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED)
+ if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED ||
+ _stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING)
{
Exception e = _stateManager.getLastException();
if (e != null)
@@ -733,25 +717,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void closeConnection(long timeout) throws AMQException
{
- getStateManager().changeState(AMQState.CONNECTION_CLOSING);
-
ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
new AMQShortString("JMS client is closing the connection."), 0, 0);
final AMQFrame frame = body.generateFrame(0);
- try
- {
- syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _protocolSession.closeProtocolSession();
- }
- catch (AMQTimeoutException e)
+ //If the connection is already closed then don't do a syncWrite
+ if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
_protocolSession.closeProtocolSession(false);
}
- catch (FailoverException e)
+ else
{
- _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ try
+ {
+ syncWrite(frame, ConnectionCloseOkBody.class, timeout);
+ _protocolSession.closeProtocolSession();
+ }
+ catch (AMQTimeoutException e)
+ {
+ _protocolSession.closeProtocolSession(false);
+ }
+ catch (FailoverException e)
+ {
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 5e12a5e6f8..0e872170aa 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -410,12 +410,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
}
- public void closeProtocolSession() throws AMQException
+ public void closeProtocolSession()
{
closeProtocolSession(true);
}
- public void closeProtocolSession(boolean waitLast) throws AMQException
+ public void closeProtocolSession(boolean waitLast)
{
_logger.debug("Waiting for last write to join.");
if (waitLast && (_lastWriteFuture != null))
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index f8645139f2..70d4697f2c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
/**
* The state manager is responsible for managing the state of the protocol session. <p/>
@@ -86,7 +87,7 @@ public class AMQStateManager implements AMQMethodListener
return _currentState;
}
- public void changeState(AMQState newState) throws AMQException
+ public void changeState(AMQState newState)
{
_logger.debug("State changing to " + newState + " from old state " + _currentState);
@@ -136,6 +137,22 @@ public class AMQStateManager implements AMQMethodListener
*/
public void error(Exception error)
{
+ if (error instanceof AMQException)
+ {
+ // AMQException should be being notified before closing the
+ // ProtocolSession. Which will change the State to CLOSED.
+ // if we have a hard error.
+ if (((AMQException)error).isHardError())
+ {
+ changeState(AMQState.CONNECTION_CLOSING);
+ }
+ }
+ else
+ {
+ // Be on the safe side here and mark the connection closed
+ changeState(AMQState.CONNECTION_CLOSED);
+ }
+
if (_waiters.size() == 0)
{
_logger.error("No Waiters for error saving as last error:" + error.getMessage());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index bddbc329ab..ee7fc533a3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -22,10 +22,11 @@ package org.apache.qpid.client.util;
import java.util.Iterator;
import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
@@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class FlowControllingBlockingQueue
{
+ private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
+
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final Queue _queue = new ConcurrentLinkedQueue();
@@ -46,6 +49,8 @@ public class FlowControllingBlockingQueue
/** We require a separate count so we can track whether we have reached the threshold */
private int _count;
+
+ private boolean disableFlowControl;
public boolean isEmpty()
{
@@ -69,6 +74,10 @@ public class FlowControllingBlockingQueue
_flowControlHighThreshold = highThreshold;
_flowControlLowThreshold = lowThreshold;
_listener = listener;
+ if (highThreshold == 0)
+ {
+ disableFlowControl = true;
+ }
}
public Object take() throws InterruptedException
@@ -84,7 +93,7 @@ public class FlowControllingBlockingQueue
}
}
}
- if (_listener != null)
+ if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{
@@ -93,6 +102,7 @@ public class FlowControllingBlockingQueue
_listener.underThreshold(_count);
}
}
+
}
return o;
@@ -106,7 +116,7 @@ public class FlowControllingBlockingQueue
notifyAll();
}
- if (_listener != null)
+ if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
index ce79080e97..da44822ec3 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
@@ -85,7 +85,7 @@ public class MockAMQConnection extends AMQConnection
}
@Override
- public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException
{
_connected = true;
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN);
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index 10ec220d9e..fc7f8148f0 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -200,15 +200,8 @@ public class AMQProtocolHandlerTest extends TestCase
_handler.getStateManager().error(trigger);
_logger.info("Setting state to be CONNECTION_CLOSED.");
- try
- {
- _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- }
- catch (AMQException e)
- {
- _logger.error("Unable to change the state to closed.", e);
- fail("Unable to change the state to closed due to :"+e.getMessage());
- }
+
+ _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
_logger.info("Firing exception");
_handler.propagateExceptionToFrameListeners(trigger);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index 8df3644929..e34103a944 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -30,7 +30,6 @@ import java.nio.ByteBuffer;
import javax.net.ssl.SSLEngine;
-import org.apache.log4j.Logger;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
@@ -59,6 +58,9 @@ import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.NetworkDriverConfiguration;
import org.apache.qpid.transport.OpenException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
@@ -80,7 +82,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
private WriteFuture _lastWriteFuture;
- private static final Logger _logger = Logger.getLogger(MINANetworkDriver.class);
+ private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class);
public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO)
{
diff --git a/qpid/java/doc/broker-priority-queue-subscription.dia b/qpid/java/doc/broker-priority-queue-subscription.dia
new file mode 100644
index 0000000000..2289899435
--- /dev/null
+++ b/qpid/java/doc/broker-priority-queue-subscription.dia
Binary files differ
diff --git a/qpid/java/doc/broker-queue-subscription.dia b/qpid/java/doc/broker-queue-subscription.dia
new file mode 100644
index 0000000000..d146ad136d
--- /dev/null
+++ b/qpid/java/doc/broker-queue-subscription.dia
Binary files differ
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java
index c3348b32f0..d88e0f38bb 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/ClientListener.java
@@ -59,7 +59,12 @@ public class ClientListener implements NotificationListener
else if (JMXConnectionNotification.FAILED.equals(type))
{
ApplicationRegistry.serverConnectionClosed(server);
- MBeanUtility.printOutput("Recieved notification from " + server.getName() + ": " + type );
+ MBeanUtility.printOutput("JMX Connection to " + server.getName() + " failed.");
+ }
+ else if (JMXConnectionNotification.CLOSED.equals(type))
+ {
+ ApplicationRegistry.serverConnectionClosed(server);
+ MBeanUtility.printOutput("JMX Connection to " + server.getName() + " was closed.");
}
}
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
index 2b459c858f..ca07e6acf4 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java
@@ -22,6 +22,7 @@ package org.apache.qpid.management.ui.jmx;
import static org.apache.qpid.management.ui.Constants.ALL;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -29,7 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import javax.management.ListenerNotFoundException;
import javax.management.MBeanInfo;
import javax.management.MBeanServerConnection;
import javax.management.Notification;
@@ -116,25 +116,54 @@ public class JMXServerRegistry extends ServerRegistry
* removes all listeners from the mbean server. This is required when user
* disconnects the Qpid server connection
*/
- public void closeServerConnection() throws Exception
+ public void closeServerConnection() throws IOException
{
try
{
+ //remove the listener from the JMXConnector
if (_jmxc != null && _clientListener != null)
+ {
_jmxc.removeConnectionNotificationListener(_clientListener);
+ }
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
+ try
+ {
+ //remove the listener from the MBeanServerDelegate MBean
if (_mbsc != null && _clientListener != null)
+ {
_mbsc.removeNotificationListener(_serverObjectName, _clientListener);
+ }
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
- // remove mbean notification listeners
+ if (_mbsc != null && _clientListener != null)
+ {
+ //remove any listeners from the Qpid MBeans
for (String mbeanName : _subscribedNotificationMap.keySet())
{
- _mbsc.removeNotificationListener(new ObjectName(mbeanName), _notificationListener);
+ try
+ {
+ _mbsc.removeNotificationListener(new ObjectName(mbeanName), _notificationListener);
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
}
}
- catch (ListenerNotFoundException ex)
+
+ //close the JMXConnector
+ if (_jmxc != null)
{
- MBeanUtility.printOutput(ex.toString());
+ _jmxc.close();
}
}
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java
index 2408faae3a..f21647b2d2 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java
@@ -882,7 +882,7 @@ public class AttributesTabControl extends TabControl
{
attribute = (AttributeData) element;
if (attribute.isWritable())
- return Display.getCurrent().getSystemColor(SWT.COLOR_DARK_BLUE);
+ return Display.getCurrent().getSystemColor(SWT.COLOR_BLUE);
else
return Display.getCurrent().getSystemColor(SWT.COLOR_BLACK);
}
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
index 0d290ab1c4..056d365f8e 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
@@ -790,9 +790,11 @@ public class NavigationView extends ViewPart
return;
}
- serverRegistry.closeServerConnection();
// Add server to the closed server list and the worker thread will remove the server from required places.
ApplicationRegistry.serverConnectionClosed(managedServer);
+
+ //close the connection
+ serverRegistry.closeServerConnection();
}
/**
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java
index e981ec1c3c..f82d37dcd1 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/connection/ConnectionOperationsTabControl.java
@@ -28,10 +28,13 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularDataSupport;
+import org.apache.qpid.management.ui.ApplicationRegistry;
import org.apache.qpid.management.ui.ManagedBean;
+import org.apache.qpid.management.ui.ServerRegistry;
import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.management.ui.jmx.JMXManagedObject;
import org.apache.qpid.management.ui.jmx.MBeanUtility;
+import org.apache.qpid.management.ui.views.MBeanView;
import org.apache.qpid.management.ui.views.TabControl;
import org.apache.qpid.management.ui.views.ViewUtility;
import org.eclipse.jface.viewers.ISelectionChangedListener;
@@ -43,6 +46,8 @@ import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.jface.viewers.Viewer;
import org.eclipse.jface.viewers.ViewerSorter;
import org.eclipse.swt.SWT;
+import org.eclipse.swt.events.MouseEvent;
+import org.eclipse.swt.events.MouseListener;
import org.eclipse.swt.events.SelectionAdapter;
import org.eclipse.swt.events.SelectionEvent;
import org.eclipse.swt.graphics.Image;
@@ -55,6 +60,8 @@ import org.eclipse.swt.widgets.Group;
import org.eclipse.swt.widgets.TabFolder;
import org.eclipse.swt.widgets.Table;
import org.eclipse.swt.widgets.TableColumn;
+import org.eclipse.ui.IWorkbenchWindow;
+import org.eclipse.ui.PlatformUI;
import org.eclipse.ui.forms.widgets.Form;
import org.eclipse.ui.forms.widgets.FormToolkit;
@@ -199,6 +206,8 @@ public class ConnectionOperationsTabControl extends TabControl
_tableViewer.setContentProvider(new ContentProviderImpl());
_tableViewer.setLabelProvider(new LabelProviderImpl());
_tableViewer.setSorter(tableSorter);
+ _table.setSortColumn(_table.getColumn(0));
+ _table.setSortDirection(SWT.UP);
Composite buttonsComposite = _toolkit.createComposite(viewChannelsGroup);
gridData = new GridData(SWT.RIGHT, SWT.BOTTOM, false, false);
@@ -278,6 +287,19 @@ public class ConnectionOperationsTabControl extends TabControl
}
});
+ //listener for double clicking to open the selection mbean
+ _table.addMouseListener(new MouseListener()
+ {
+ // MouseListener implementation
+ public void mouseDoubleClick(MouseEvent event)
+ {
+ openMBean(_table);
+ }
+
+ public void mouseDown(MouseEvent e){}
+ public void mouseUp(MouseEvent e){}
+ });
+
_tableViewer.addSelectionChangedListener(new ISelectionChangedListener(){
public void selectionChanged(SelectionChangedEvent evt)
{
@@ -465,5 +487,43 @@ public class ConnectionOperationsTabControl extends TabControl
return comparison;
}
}
+
+ private void openMBean(Table table)
+ {
+ int selectionIndex = table.getSelectionIndex();
+
+ if (selectionIndex == -1)
+ {
+ return;
+ }
+
+ CompositeData channelResult = (CompositeData) table.getItem(selectionIndex).getData();
+ String queueName = (String) channelResult.get(DEFAULT_QUEUE);
+
+ if(queueName == null)
+ {
+ return;
+ }
+
+ ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(_mbean);
+ ManagedBean selectedMBean = serverRegistry.getQueue(queueName, _mbean.getVirtualHostName());
+
+ if(selectedMBean == null)
+ {
+ ViewUtility.popupErrorMessage("Error", "Unable to retrieve the selected MBean to open it");
+ return;
+ }
+
+ IWorkbenchWindow window = PlatformUI.getWorkbench().getActiveWorkbenchWindow();
+ MBeanView view = (MBeanView) window.getActivePage().findView(MBeanView.ID);
+ try
+ {
+ view.openMBean(selectedMBean);
+ }
+ catch (Exception ex)
+ {
+ MBeanUtility.handleException(selectedMBean, ex);
+ }
+ }
}
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java
index 35b3e8150c..e3dea6e96b 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/ExchangeOperationsTabControl.java
@@ -237,6 +237,8 @@ public class ExchangeOperationsTabControl extends TabControl
_keysTableViewer.setContentProvider(new ContentProviderImpl(BINDING_KEY));
_keysTableViewer.setLabelProvider(new LabelProviderImpl(BINDING_KEY));
_keysTableViewer.setSorter(tableSorter);
+ _keysTable.setSortColumn(_keysTable.getColumn(0));
+ _keysTable.setSortDirection(SWT.UP);
_queuesTable = new Table (tablesComposite, SWT.SINGLE | SWT.SCROLL_LINE | SWT.BORDER | SWT.FULL_SELECTION);
@@ -287,6 +289,8 @@ public class ExchangeOperationsTabControl extends TabControl
_queuesTableViewer.setContentProvider(new ContentProviderImpl(QUEUES));
_queuesTableViewer.setLabelProvider(new LabelProviderImpl(QUEUES));
_queuesTableViewer.setSorter(queuesTableSorter);
+ _queuesTable.setSortColumn(_queuesTable.getColumn(0));
+ _queuesTable.setSortDirection(SWT.UP);
_queuesTableViewer.setInput(new String[]{"Select a binding key to view queues"});
//listener for double clicking to open the selection mbean
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java
index 5146bab74c..fcce0e67b6 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/exchange/HeadersExchangeOperationsTabControl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.management.ui.views.exchange;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import javax.management.MBeanServerConnection;
@@ -48,6 +49,7 @@ import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.jface.viewers.Viewer;
import org.eclipse.jface.viewers.ViewerSorter;
import org.eclipse.swt.SWT;
+import org.eclipse.swt.custom.ScrolledComposite;
import org.eclipse.swt.events.MouseEvent;
import org.eclipse.swt.events.MouseListener;
import org.eclipse.swt.events.SelectionAdapter;
@@ -60,6 +62,7 @@ import org.eclipse.swt.widgets.Combo;
import org.eclipse.swt.widgets.Composite;
import org.eclipse.swt.widgets.Control;
import org.eclipse.swt.widgets.Group;
+import org.eclipse.swt.widgets.Label;
import org.eclipse.swt.widgets.Shell;
import org.eclipse.swt.widgets.TabFolder;
import org.eclipse.swt.widgets.Table;
@@ -182,7 +185,7 @@ public class HeadersExchangeOperationsTabControl extends TabControl
final TableSorter tableSorter = new TableSorter(BINDING_NUM);
String[] titles = {"Binding Number", "Queue Name"};
- int[] bounds = {125, 175};
+ int[] bounds = {135, 175};
for (int i = 0; i < titles.length; i++)
{
final int index = i;
@@ -220,6 +223,8 @@ public class HeadersExchangeOperationsTabControl extends TabControl
_bindingNumberTableViewer.setContentProvider(new ContentProviderImpl(BINDING_NUM));
_bindingNumberTableViewer.setLabelProvider(new LabelProviderImpl(BINDING_NUM));
_bindingNumberTableViewer.setSorter(tableSorter);
+ _bindingNumberTable.setSortColumn(_bindingNumberTable.getColumn(0));
+ _bindingNumberTable.setSortDirection(SWT.UP);
//table of header bindings
_headersTable = new Table (tablesComposite, SWT.SINGLE | SWT.SCROLL_LINE | SWT.BORDER | SWT.FULL_SELECTION);
@@ -272,6 +277,8 @@ public class HeadersExchangeOperationsTabControl extends TabControl
_headersTableViewer.setContentProvider(new ContentProviderImpl(HEADER_BINDINGS));
_headersTableViewer.setLabelProvider(new LabelProviderImpl(HEADER_BINDINGS));
_headersTableViewer.setSorter(queuesTableSorter);
+ _headersTable.setSortColumn(_headersTable.getColumn(0));
+ _headersTable.setSortDirection(SWT.UP);
_headersTableViewer.setInput(new String[]{"Select a binding to view key-value pairs"});
_bindingNumberTableViewer.addSelectionChangedListener(new ISelectionChangedListener(){
@@ -490,25 +497,38 @@ public class HeadersExchangeOperationsTabControl extends TabControl
private void createNewBinding(Shell parent)
{
final Shell shell = ViewUtility.createModalDialogShell(parent, "Create New Binding");
-
- Composite destinationComposite = _toolkit.createComposite(shell, SWT.NONE);
- destinationComposite.setBackground(shell.getBackground());
- destinationComposite.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
- destinationComposite.setLayout(new GridLayout(2,false));
+
+ Composite queueNameComposite = _toolkit.createComposite(shell, SWT.NONE);
+ queueNameComposite.setBackground(shell.getBackground());
+ GridData layoutData = new GridData(SWT.CENTER, SWT.TOP, true, false);
+ layoutData.minimumWidth = 300;
+ queueNameComposite.setLayoutData(layoutData);
+ queueNameComposite.setLayout(new GridLayout(2,false));
- _toolkit.createLabel(destinationComposite,"Queue:").setBackground(shell.getBackground());
- final Combo destinationCombo = new Combo(destinationComposite,SWT.NONE | SWT.READ_ONLY);
+ _toolkit.createLabel(queueNameComposite,"Queue:").setBackground(shell.getBackground());
+ final Combo destinationCombo = new Combo(queueNameComposite,SWT.NONE | SWT.READ_ONLY);
destinationCombo.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
- Composite bindingComposite = _toolkit.createComposite(shell, SWT.NONE);
- bindingComposite.setBackground(shell.getBackground());
- bindingComposite.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
- bindingComposite.setLayout(new GridLayout(2,false));
+ final ScrolledComposite scrolledComposite = new ScrolledComposite(shell, SWT.V_SCROLL);
+ scrolledComposite.setExpandHorizontal(true);
+ scrolledComposite.setLayout(new GridLayout());
+ scrolledComposite.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
+ scrolledComposite.setBackground(shell.getBackground());
+
+ final Composite bindingComposite = _toolkit.createComposite(scrolledComposite, SWT.NONE);
+ bindingComposite.setBackground(scrolledComposite.getBackground());
+ bindingComposite.setLayout(new GridLayout(2,true));
+ layoutData = new GridData(SWT.FILL, SWT.TOP, true, false);
+ bindingComposite.setLayoutData(layoutData);
+ scrolledComposite.setContent(bindingComposite);
+
+ Composite addMoreButtonComp = _toolkit.createComposite(shell);
+ addMoreButtonComp.setBackground(shell.getBackground());
+ addMoreButtonComp.setLayoutData(new GridData(SWT.RIGHT, SWT.FILL, true, true));
+ addMoreButtonComp.setLayout(new GridLayout());
+
+ final Button addMoreButton = _toolkit.createButton(addMoreButtonComp, "Add additional field", SWT.PUSH);
- _toolkit.createLabel(bindingComposite,"Binding:").setBackground(shell.getBackground());
- final Text bindingText = new Text(bindingComposite, SWT.BORDER);
- bindingText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
-
Composite okCancelButtonsComp = _toolkit.createComposite(shell);
okCancelButtonsComp.setBackground(shell.getBackground());
okCancelButtonsComp.setLayoutData(new GridData(SWT.RIGHT, SWT.FILL, true, true));
@@ -532,26 +552,106 @@ public class HeadersExchangeOperationsTabControl extends TabControl
destinationCombo.setItems(queueList.toArray(new String[0]));
}
destinationCombo.select(0);
+
+ final HashMap<Text, Text> headerBindingHashMap = new HashMap<Text, Text>();
+
+ //add headings
+ Label keyLabel = _toolkit.createLabel(bindingComposite,"Key:");
+ keyLabel.setBackground(bindingComposite.getBackground());
+ keyLabel.setLayoutData(new GridData(SWT.CENTER, SWT.TOP, true, false));
+
+ Label valueLabel = _toolkit.createLabel(bindingComposite,"Value:");
+ valueLabel.setBackground(bindingComposite.getBackground());
+ valueLabel.setLayoutData(new GridData(SWT.CENTER, SWT.TOP, true, false));
+
+ //add the x-match key by default and offer a comobo to select its value
+ final Text xmatchKeyText = new Text(bindingComposite, SWT.BORDER);
+ xmatchKeyText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
+ xmatchKeyText.setText("x-match");
+ xmatchKeyText.setEditable(false);
+
+ final Combo xmatchValueCombo = new Combo(bindingComposite,SWT.NONE | SWT.READ_ONLY);
+ xmatchValueCombo.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
+ xmatchValueCombo.setItems(new String[]{"any", "all"});
+ xmatchValueCombo.select(0);
+
+ //make some empty key-value fields
+ for(int i=0; i < 4; i++)
+ {
+ Text keyText = new Text(bindingComposite, SWT.BORDER);
+ Text valueText = new Text(bindingComposite, SWT.BORDER);
+ keyText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
+ valueText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
+
+ headerBindingHashMap.put(keyText, valueText);
+ }
+ bindingComposite.setSize(bindingComposite.computeSize(SWT.DEFAULT, SWT.DEFAULT));
+
+ //allow adding more fields for additional key-value pairs
+ addMoreButton.addSelectionListener(new SelectionAdapter()
+ {
+ public void widgetSelected(SelectionEvent e)
+ {
+ Text keyText = new Text(bindingComposite, SWT.BORDER);
+ Text valueText = new Text(bindingComposite, SWT.BORDER);
+ keyText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
+ valueText.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false));
+
+ headerBindingHashMap.put(keyText, valueText);
+
+ bindingComposite.setSize(bindingComposite.computeSize(SWT.DEFAULT, SWT.DEFAULT));
+ bindingComposite.layout(true);
+ scrolledComposite.layout(true);
+ }
+ });
okButton.addSelectionListener(new SelectionAdapter()
{
public void widgetSelected(SelectionEvent e)
{
- String binding = bindingText.getText();
+ String xMatchString = xmatchValueCombo.getText();
- if (binding == null || binding.length() == 0)
+ String destQueue = destinationCombo.getItem(destinationCombo.getSelectionIndex()).toString();
+
+ StringBuffer bindingValue = new StringBuffer();
+
+ //insert the x-match key-value pair
+ if (xMatchString.equalsIgnoreCase("any"))
{
- ViewUtility.popupErrorMessage("Create New Binding", "Please enter a valid binding");
- return;
+ bindingValue.append("x-match=any");
+ }
+ else
+ {
+ bindingValue.append("x-match=all");
}
- String destQueue = destinationCombo.getItem(destinationCombo.getSelectionIndex()).toString();
+ //insert the other key-value pairs
+ for (Text keyText : headerBindingHashMap.keySet())
+ {
+
+ String key = keyText.getText();
+ if(key == null || key.length() == 0)
+ {
+ continue;
+ }
+
+ Text valueText = headerBindingHashMap.get(keyText);
+ String value = valueText.getText();
+
+ bindingValue.append(",");
+ bindingValue.append(key + "=");
+ //empty values are permitted, signalling only key-presence is required
+ if(value != null && value.length() > 0)
+ {
+ bindingValue.append(value);
+ }
+ }
shell.dispose();
try
{
- _emb.createNewBinding(destQueue, binding);
+ _emb.createNewBinding(destQueue, bindingValue.toString());
ViewUtility.operationResultFeedback(null, "Created new Binding", null);
}
catch (Exception e4)
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java
index 3e8a5da5b5..1b1d08aa67 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java
@@ -197,8 +197,8 @@ public class ConfigurationFileTabControl extends TabControl
"NOTE: These options modify the configuration file. " +
"Changes only take effect automatically if LogWatch is enabled.");
Label noteLabel2 = _toolkit.createLabel(_headerComposite,
- "A Logger set to a non-inherited Level in the Runtime tab " +
- "will retain that value after the configuration is reloaded.");
+ "A child Logger set to a non-inherited Level in the Runtime tab " +
+ "will retain that value after the file is reloaded.");
GridData gridData = new GridData(SWT.FILL, SWT.FILL, false, true);
noteLabel.setLayoutData(gridData);
gridData = new GridData(SWT.FILL, SWT.FILL, false, true);
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java
index c7c7a1791a..43d2cfe204 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/queue/QueueOperationsTabControl.java
@@ -299,8 +299,8 @@ public class QueueOperationsTabControl extends TabControl
//message table
Composite tableAndButtonsComposite = _toolkit.createComposite(messagesGroup);
gridData = new GridData(SWT.FILL, SWT.FILL, true, true);
- gridData.minimumHeight = 220;
- gridData.heightHint = 220;
+ gridData.minimumHeight = 180;
+ gridData.heightHint = 180;
tableAndButtonsComposite.setLayoutData(gridData);
tableAndButtonsComposite.setLayout(new GridLayout(2,false));
@@ -358,6 +358,8 @@ public class QueueOperationsTabControl extends TabControl
_tableViewer.setContentProvider(new ContentProviderImpl());
_tableViewer.setLabelProvider(new LabelProviderImpl());
_tableViewer.setSorter(tableSorter);
+ _table.setSortColumn(_table.getColumn(0));
+ _table.setSortDirection(SWT.UP);
//Side Buttons
Composite buttonsComposite = _toolkit.createComposite(tableAndButtonsComposite);
@@ -492,12 +494,12 @@ public class QueueOperationsTabControl extends TabControl
headerEtcComposite.setLayoutData(gridData);
headerEtcComposite.setLayout(new GridLayout());
- final Text headerText = new Text(headerEtcComposite, SWT.WRAP | SWT.BORDER );
+ final Text headerText = new Text(headerEtcComposite, SWT.WRAP | SWT.BORDER | SWT.V_SCROLL);
headerText.setText("Select a message to view its header.");
headerText.setEditable(false);
data = new GridData(SWT.LEFT, SWT.TOP, false, false);
- data.minimumHeight = 230;
- data.heightHint = 230;
+ data.minimumHeight = 210;
+ data.heightHint = 210;
data.minimumWidth = 500;
data.widthHint = 500;
headerText.setLayoutData(data);
@@ -570,10 +572,16 @@ public class QueueOperationsTabControl extends TabControl
String[] msgHeader = (String[]) selectedMsg.get(MSG_HEADER);
headerText.setText("");
String lineSeperator = System.getProperty("line.separator");
- for(String s: msgHeader)
+ int size = msgHeader.length;
+ for(int i=0; i < size; i++)
{
- headerText.append(s + lineSeperator);
+ headerText.append(msgHeader[i]);
+ if(!(i == size - 1))
+ {
+ headerText.append(lineSeperator);
+ }
}
+ headerText.setSelection(0);
}
if (_table.getSelectionCount() > 1)
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java
index fef1e9f887..3b03aeaff1 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/vhost/VHostTabControl.java
@@ -205,6 +205,8 @@ public class VHostTabControl extends TabControl
_queueTableViewer.setContentProvider(new ContentProviderImpl());
_queueTableViewer.setLabelProvider(new LabelProviderImpl());
_queueTableViewer.setSorter(tableSorter);
+ _queueTable.setSortColumn(_queueTable.getColumn(0));
+ _queueTable.setSortDirection(SWT.UP);
Composite queuesRightComposite = _toolkit.createComposite(queuesGroup);
gridData = new GridData(SWT.FILL, SWT.FILL, false, true);
@@ -314,6 +316,8 @@ public class VHostTabControl extends TabControl
_exchangeTableViewer.setContentProvider(new ContentProviderImpl());
_exchangeTableViewer.setLabelProvider(new LabelProviderImpl());
_exchangeTableViewer.setSorter(exchangeTableSorter);
+ _exchangeTable.setSortColumn(_exchangeTable.getColumn(0));
+ _exchangeTable.setSortDirection(SWT.UP);
Composite exchangesRightComposite = _toolkit.createComposite(exchangesGroup);
gridData = new GridData(SWT.FILL, SWT.FILL, false, true);
diff --git a/qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini b/qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini
index 9e3de042d5..312580769e 100644
--- a/qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini
+++ b/qpid/java/management/eclipse-plugin/src/main/resources/win32-win32-x86/qpidmc.ini
@@ -23,14 +23,14 @@
-XX:MaxPermSize=256m
-Dosgi.requiredJavaVersion=1.5
-Declipse.consoleLog=true
-
-#===============================================
-# SSL trust store configuration options.
-#===============================================
-
-# Uncomment lines below to specify custom truststore for server SSL
-# certificate verification, eg when using self-signed server certs.
-#
-#-Djavax.net.ssl.trustStore=<path.to.truststore>
-#-Djavax.net.ssl.trustStorePassword=<truststore.password>
-
+
+#===============================================
+# SSL trust store configuration options.
+#===============================================
+
+# Uncomment lines below to specify custom truststore for server SSL
+# certificate verification, eg when using self-signed server certs.
+#
+#-Djavax.net.ssl.trustStore=<path.to.truststore>
+#-Djavax.net.ssl.trustStorePassword=<truststore.password>
+
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
index 7a2266902b..b4ba6e8156 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
@@ -29,6 +29,8 @@ import org.apache.qpid.server.logging.AbstractTestLogging;
import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanServerConnection;
@@ -38,6 +40,8 @@ import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* Test class to test if any change in the broker JMX code is affesting the management console
@@ -161,6 +165,23 @@ public class ManagementActorLoggingTest extends AbstractTestLogging
//Create a connection to the broker
Connection connection = getConnection();
+ // Monitor the connection for an exception being thrown
+ // this should be a DisconnectionException but it is not this tests
+ // job to valiate that. Only use the exception as a synchronisation
+ // to check the log file for the Close message
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ //Failover being attempted.
+ exceptionReceived.countDown();
+ }
+ });
+
+ //Remove the connection close from any 0-10 connections
+ _monitor.reset();
+
// Get all active AMQP connections
AllObjects allObject = new AllObjects(_mbsc);
allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,*";
@@ -175,16 +196,17 @@ public class ManagementActorLoggingTest extends AbstractTestLogging
newProxyInstance(_mbsc, connectionName,
ManagedConnection.class, false);
- //Remove the connection close from any 0-10 connections
- _monitor.reset();
//Close the connection
mangedConnection.closeConnection();
+ //Wait for the connection to close
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
+
//Validate results
List<String> results = _monitor.findMatches("CON-1002");
-
assertEquals("Unexpected Connection Close count", 1, results.size());
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
index d7209c5660..5dd56fb0f9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
@@ -29,6 +29,7 @@ import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
+import javax.jms.Message;
import java.io.IOException;
import java.util.List;
@@ -327,22 +328,45 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
int PREFETCH = 15;
//Create new session with small prefetch
- _session = ((AMQConnection) _connection).createSession(true, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+ _session = ((AMQConnection) _connection).createSession(true, Session.SESSION_TRANSACTED, PREFETCH);
MessageConsumer consumer = _session.createConsumer(_queue);
_connection.start();
+ //Start the dispatcher & Unflow the channel.
+ consumer.receiveNoWait();
+
//Fill the prefetch and two extra so that our receive bellow allows the
- // subscription to become active then return to a suspended state.
- sendMessage(_session, _queue, 17);
+ // subscription to become active
+ // Previously we set this to 17 so that it would return to a suspended
+ // state. However, testing has shown that the state change can occur
+ // sufficiently quickly that logging does not occur consistently enough
+ // for testing.
+ int SEND_COUNT = 16;
+ sendMessage(_session, _queue, SEND_COUNT);
_session.commit();
// Retreive the first message, and start the flow of messages
- assertNotNull("First message not retreived", consumer.receive(1000));
+ Message msg = consumer.receive(1000);
+ assertNotNull("First message not retreived", msg);
_session.commit();
-
- _connection.close();
+ // Drain the queue to ensure there is time for the ACTIVE log message
+ // Check that we can received all the messages
+ int receivedCount = 0;
+ while (msg != null)
+ {
+ receivedCount++;
+ msg = consumer.receive(1000);
+ _session.commit();
+ }
+
+ //Validate we received all the messages
+ assertEquals("Not all sent messages received.", SEND_COUNT, receivedCount);
+
+ // Fill the queue again to suspend the consumer
+ sendMessage(_session, _queue, SEND_COUNT);
+ _session.commit();
//Validate
List<String> results = _monitor.findMatches("SUB-1003");
@@ -350,15 +374,13 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
try
{
// Validation expects three messages.
- // The first will be logged by the QueueActor as part of the processQueue thread
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
- // The second will be by the connnection as it acknowledges and activates the subscription
-// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
- // The final one can be the subscription suspending as part of the SubFlushRunner or the processQueue thread
- // As a result validating the actor is more complicated and doesn't add anything. The goal of this test is
- // to ensure the State is correct not that a particular Actor performs the logging.
-// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+ // The Actor can be any one of the following depending on the exactly what is going on on the broker.
+ // Ideally we would test that we can get all of them but setting up
+ // the timing to do this in a consistent way is not benefitial.
+ // Ensuring the State is as expected is sufficient.
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State :
+// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State :
+// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State :
assertEquals("Result set not expected size:", 3, results.size());
@@ -367,19 +389,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
String log = getLog(results.get(0));
validateSubscriptionState(log, expectedState);
- // Validate that the logActor is the the queue
- String actor = fromActor(log);
- assertTrue("Actor string does not contain expected queue("
- + _queue.getQueueName() + ") name." + actor,
- actor.contains("qu(" + _queue.getQueueName() + ")"));
-
// After being suspended the subscription should become active.
expectedState = "ACTIVE";
log = getLog(results.get(1));
validateSubscriptionState(log, expectedState);
- // Validate we have a connection Actor
- actor = fromActor(log);
- assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:"));
// Validate that it was re-suspended
expectedState = "SUSPENDED";
@@ -396,6 +409,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
}
throw afe;
}
+ _connection.close();
+
+ //Ensure the queue is drained before the test ends
+ drainQueue(_queue);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
new file mode 100644
index 0000000000..c9810e7304
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * QPID-155
+ *
+ * Test to validate that setting the respective qpid.declare_queues,
+ * qpid.declare_exchanges system properties functions as expected.
+ *
+ */
+public class DynamicQueueExchangeCreateTest extends QpidTestCase
+{
+
+ public void testQueueDeclare() throws Exception
+ {
+ setSystemProperty("qpid.declare_queues", "false");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(getTestQueueName());
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("JMSException should be thrown as the queue does not exist");
+ }
+ catch (JMSException e)
+ {
+ assertTrue("Exception should be that the queue does not exist :" +
+ e.getMessage(),
+ e.getMessage().contains("does not exist"));
+
+ }
+ }
+
+ public void testExchangeDeclare() throws Exception
+ {
+ setSystemProperty("qpid.declare_exchanges", "false");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String EXCHANGE_TYPE = "test.direct";
+ Queue queue = session.createQueue("direct://" + EXCHANGE_TYPE + "/queue/queue");
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("JMSException should be thrown as the exchange does not exist");
+ }
+ catch (JMSException e)
+ {
+ assertTrue("Exception should be that the exchange does not exist :" +
+ e.getMessage(),
+ e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist"));
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
new file mode 100644
index 0000000000..3fb6cd3526
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.Session;
+
+/** QPID-1809
+ *
+ * Race condition on error handling and close logic.
+ *
+ * See most often with SimpleACLTest as this test is the expects the server to
+ * shut the connection/channels. This sort of testing is not performed by many,
+ * if any, of the other system tests.
+ *
+ * The problem is that we have two threads
+ *
+ * MainThread Exception(Mina)Thread
+ * | |
+ * Performs |
+ * ACtion |
+ * | Receives Server
+ * | Close
+ * Blocks for |
+ * Response |
+ * | Starts To Notify
+ * | client
+ * | |
+ * | <----- Notify Main Thread
+ * Notification |
+ * wakes client |
+ * | |
+ * Client then |
+ * processes Error. |
+ * | |
+ * Potentially Attempting Close Channel/Connection
+ * Connection Close
+ *
+ * The two threads both attempt to close the connection but the main thread does
+ * so assuming that the connection is open and valid.
+ *
+ * The Exception thread must modify the connection so that no furter syncWait
+ * commands are performed.
+ *
+ * This test sends an ExchangeDeclare that is Asynchronous and will fail and
+ * so cause a ChannelClose error but we perform a syncWait so that we can be
+ * sure to test that the BlockingWaiter is correctly awoken.
+ *
+ */
+public class JavaServerCloseRaceConditionTest extends QpidTestCase
+{
+ private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup";
+
+ public void test() throws Exception
+ {
+
+ AMQConnection connection = (AMQConnection) getConnection();
+
+ AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set no wait true so that we block the connection
+ // Also set a different exchange class string so the attempt to declare
+ // the exchange causes an exchange.
+ ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null,
+ true, false, false, false, true, null);
+
+ AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId());
+
+ try
+ {
+ // block our thread so that can times out
+ connection.getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+ catch (Exception e)
+ {
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
+ }
+
+ try
+ {
+ // Depending on if the notification thread has closed the connection
+ // or not we may get an exception here when we attempt to close the
+ // connection. If we do get one then it should be the same as above
+ // an AMQAuthenticationException.
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
+ }
+
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index c5cdb83bbf..2a44413ac8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -51,7 +51,13 @@ import javax.jms.TopicSubscriber;
public class DurableSubscriptionTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
+
+ /** Timeout for receive() if we are expecting a message */
+ private static final long POSITIVE_RECEIVE_TIMEOUT = 2000;
+
+ /** Timeout for receive() if we are not expecting a message */
+ private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+
public void testUnsubscribe() throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -76,16 +82,18 @@ public class DurableSubscriptionTest extends QpidTestCase
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- _logger.info("Receive message on consumer 1:expecting A");
- msg = consumer2.receive();
+ _logger.info("Receive message on consumer 2:expecting A");
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
@@ -96,14 +104,15 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
_logger.info("Receive message on consumer 2 :expecting null");
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
_logger.info("Close connection");
@@ -143,14 +152,16 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session1.createTextMessage("A"));
Message msg;
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- msg = consumer2.receive();
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
consumer2.close();
@@ -220,8 +231,8 @@ public class DurableSubscriptionTest extends QpidTestCase
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
- msg = consumer2.receive();
- assertNotNull(msg);
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
msg = consumer2.receive(500);
assertNull("There should be no more messages for consumption on consumer2.", msg);
@@ -235,10 +246,10 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session0.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
// Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
@@ -296,7 +307,7 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive();
+ Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -331,7 +342,7 @@ public class DurableSubscriptionTest extends QpidTestCase
assertNotNull("Subscriber should have been created", liveSubscriber);
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive();
+ Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -360,13 +371,13 @@ public class DurableSubscriptionTest extends QpidTestCase
// Send 1 matching message and 1 non-matching message
sendMatchingAndNonMatchingMessage(session, producer);
- Message rMsg = subA.receive(1000);
+ Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector1",
((TextMessage) rMsg).getText());
- rMsg = subA.receive(1000);
+ rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
// Disconnect subscriber
@@ -379,13 +390,13 @@ public class DurableSubscriptionTest extends QpidTestCase
// Check messages are recieved properly
sendMatchingAndNonMatchingMessage(session, producer);
- rMsg = subB.receive(1000);
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector2",
((TextMessage) rMsg).getText());
- rMsg = subB.receive(1000);
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
session.unsubscribe("testResubscribeWithChangedSelector");
}
@@ -429,5 +440,5 @@ public class DurableSubscriptionTest extends QpidTestCase
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DurableSubscriptionTest.class);
- }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 9c755fcb41..b603455644 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -479,7 +479,7 @@ public class CommitRollbackTest extends QpidTestCase
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
_pubSession.commit();
- assertNotNull(_consumer.receive(100));
+ assertNotNull(_consumer.receive(1000));
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index cc9cfce34b..1bef07fcd5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -55,7 +55,7 @@ public class FailoverBaseCase extends QpidTestCase
{
super.setUp();
setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
- startBroker(FAILING_PORT);
+ startBroker(failingPort);
}
/**
@@ -76,7 +76,7 @@ public class FailoverBaseCase extends QpidTestCase
public void tearDown() throws Exception
{
- stopBroker(FAILING_PORT);
+ stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT);
super.tearDown();
FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
index df8dd0b85b..44ac5b4838 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
@@ -31,6 +31,7 @@ import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -118,7 +119,7 @@ public class LogMonitor
* @throws java.io.FileNotFoundException if the Log file can nolonger be found
* @throws IOException thrown when reading the log file
*/
- public boolean waitForMessage(String message, long wait)
+ public boolean waitForMessage(String message, long wait, boolean printFileOnFailure)
throws FileNotFoundException, IOException
{
// Loop through alerts until we're done or wait ms seconds have passed,
@@ -126,20 +127,35 @@ public class LogMonitor
BufferedReader reader = new BufferedReader(new FileReader(_logfile));
boolean found = false;
long endtime = System.currentTimeMillis() + wait;
+ ArrayList<String> contents = new ArrayList<String>();
while (!found && System.currentTimeMillis() < endtime)
{
while (reader.ready())
{
String line = reader.readLine();
+ contents.add(line);
if (line.contains(message))
{
found = true;
}
}
}
-
+ if (!found && printFileOnFailure)
+ {
+ for (String line : contents)
+ {
+ System.out.println(line);
+ }
+ }
return found;
}
+
+
+ public boolean waitForMessage(String messageCountAlert, long alertLogWaitPeriod) throws FileNotFoundException, IOException
+ {
+ return waitForMessage(messageCountAlert, alertLogWaitPeriod, true);
+ }
+
/**
* Read the log file in to memory as a String
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
index d1a0df30a9..2b9fe8e039 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
@@ -30,25 +30,25 @@ import java.util.List;
public class LogMonitorTest extends TestCase
{
+ private LogMonitor _monitor;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ _monitor = new LogMonitor();
+ _monitor.getMonitoredFile().deleteOnExit(); // Make sure we clean up
+ }
+
/**
* Test that a new file is created when attempting to set up a monitor with
* the default constructor.
*/
public void testMonitor()
{
- // Validate that a NPE is thrown with null input
- try
- {
- LogMonitor montior = new LogMonitor();
- //Validte that the monitor is now running on a new file
- assertTrue("New file does not have correct name:" + montior.
- getMonitoredFile().getName(),
- montior.getMonitoredFile().getName().contains("LogMonitor"));
- }
- catch (IOException ioe)
- {
- fail("IOE thrown:" + ioe);
- }
+ //Validate that the monitor is now running on a new file
+ assertTrue("New file does not have correct name:" + _monitor.
+ getMonitoredFile().getName(),
+ _monitor.getMonitoredFile().getName().contains("LogMonitor"));
}
/**
@@ -63,13 +63,11 @@ public class LogMonitorTest extends TestCase
File testFile = File.createTempFile("testMonitorFile", ".log");
testFile.deleteOnExit();
- LogMonitor monitor;
-
//Ensure that we can create a monitor on a file
try
{
- monitor = new LogMonitor(testFile);
- assertEquals(testFile, monitor.getMonitoredFile());
+ _monitor = new LogMonitor(testFile);
+ assertEquals(testFile, _monitor.getMonitoredFile());
}
catch (IOException ioe)
{
@@ -136,13 +134,12 @@ public class LogMonitorTest extends TestCase
*/
public void testFindMatches_Match() throws IOException
{
- LogMonitor monitor = new LogMonitor();
String message = getName() + ": Test Message";
Logger.getRootLogger().warn(message);
- validateLogContainsMessage(monitor, message);
+ validateLogContainsMessage(_monitor, message);
}
/**
@@ -152,35 +149,17 @@ public class LogMonitorTest extends TestCase
*/
public void testFindMatches_NoMatch() throws IOException
{
- LogMonitor monitor = new LogMonitor();
-
String message = getName() + ": Test Message";
Logger.getRootLogger().warn(message);
String notLogged = "This text was not logged";
- validateLogDoesNotContainsMessage(monitor, notLogged);
- }
-
- public void testWaitForMessage_Found() throws IOException
- {
- LogMonitor monitor = new LogMonitor();
-
- String message = getName() + ": Test Message";
-
- long TIME_OUT = 2000;
-
- logMessageWithDelay(message, TIME_OUT / 2);
-
- assertTrue("Message was not logged ",
- monitor.waitForMessage(message, TIME_OUT));
+ validateLogDoesNotContainsMessage(_monitor, notLogged);
}
public void testWaitForMessage_Timeout() throws IOException
{
- LogMonitor monitor = new LogMonitor();
-
String message = getName() + ": Test Message";
long TIME_OUT = 2000;
@@ -189,41 +168,37 @@ public class LogMonitorTest extends TestCase
// Verify that we can time out waiting for a message
assertFalse("Message was logged ",
- monitor.waitForMessage(message, TIME_OUT / 2));
+ _monitor.waitForMessage(message, TIME_OUT / 2, false));
// Verify that the message did eventually get logged.
assertTrue("Message was never logged.",
- monitor.waitForMessage(message, TIME_OUT));
+ _monitor.waitForMessage(message, TIME_OUT));
}
public void testReset() throws IOException
{
- LogMonitor monitor = new LogMonitor();
-
String message = getName() + ": Test Message";
Logger.getRootLogger().warn(message);
- validateLogContainsMessage(monitor, message);
+ validateLogContainsMessage(_monitor, message);
String LOG_RESET_TEXT = "Log Monitor Reset";
- validateLogDoesNotContainsMessage(monitor, LOG_RESET_TEXT);
+ validateLogDoesNotContainsMessage(_monitor, LOG_RESET_TEXT);
- monitor.reset();
+ _monitor.reset();
- assertEquals("", monitor.readFile());
+ assertEquals("", _monitor.readFile());
}
public void testRead() throws IOException
{
- LogMonitor monitor = new LogMonitor();
-
String message = getName() + ": Test Message";
Logger.getRootLogger().warn(message);
- String fileContents = monitor.readFile();
+ String fileContents = _monitor.readFile();
assertTrue("Logged message not found when reading file.",
fileContents.contains(message));
diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes
index 7577ad8da1..36e7317e31 100644
--- a/qpid/java/test-profiles/010Excludes
+++ b/qpid/java/test-profiles/010Excludes
@@ -4,6 +4,7 @@ org.apache.qpid.client.ResetMessageListenerTest#*
//These tests are for the java broker
org.apache.qpid.server.security.acl.SimpleACLTest#*
org.apache.qpid.server.plugins.PluginTest#*
+org.apache.qpid.server.BrokerStartupTest#*
// This test is not finished
org.apache.qpid.test.testcases.TTLTest#*
@@ -82,3 +83,9 @@ org.apache.qpid.server.logging.*
// CPP Broker does not have a JMX interface to test
org.apache.qpid.management.jmx.*
+
+// 0-10 is not supported by the MethodRegistry
+org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
+
+// QPID-2084 : this test needs more work for 0-10
+org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#*