summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-23 20:55:13 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-23 20:55:13 +0000
commit3debc8267203e6bb2cc6a800d0ffff7e87f1ef65 (patch)
tree7fa160a79a8214c2d4cc3845696b6273e681f31a
parentfa7a35e31c0eac1713801a632398b192af2e0ab2 (diff)
downloadqpid-python-3debc8267203e6bb2cc6a800d0ffff7e87f1ef65.tar.gz
Merged from trunk up to 823464
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829235 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java10
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java330
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java89
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java104
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java4
-rwxr-xr-xqpid/java/test-profiles/010Excludes2
-rw-r--r--qpid/java/test-profiles/Excludes1
-rw-r--r--qpid/java/test-profiles/test-provider.properties2
21 files changed, 582 insertions, 59 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 6132c890c3..62dd76f832 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
- }
+ }
else
{
if (message.isQueueDeleted())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 5d7adc6371..9918013888 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,6 +54,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+
final AMQChannel channel = new AMQChannel(session,channelId,
virtualHost.getMessageStore());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 356a5121e4..d5c6a6d130 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -149,6 +149,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
private boolean _nolocal;
+ private final AtomicBoolean _overfull = new AtomicBoolean(false);
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
{
@@ -1317,6 +1318,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if(_atomicQueueSize.get() > _capacity)
{
+ _overfull.set(true);
//Overfull log message
_logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
@@ -1347,10 +1349,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if(_capacity != 0L)
{
- if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
{
- //Underfull log message
- _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+ if(_overfull.compareAndSet(true,false))
+ {//Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+ }
for(AMQChannel c : _blockedChannels.keySet())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
index 33213055ca..c3aa68cc3f 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
@@ -494,6 +494,12 @@ public class PrincipalPermissions
{
AMQQueue queue = ((AMQQueue) parameters[0]);
Map queuePermissions = (Map) _permissions.get(permission);
+
+ if (queuePermissions == null)
+ {
+ //if the outer map is null, the user has no CONSUME rights at all
+ return AuthzResult.DENIED;
+ }
List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 2f5d07e396..b57c834598 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -313,7 +313,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
protected AMQConnectionDelegate _delegate;
// this connection maximum number of prefetched messages
- protected int _maxPrefetch;
+ private int _maxPrefetch;
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index e5980d8b7d..e6c3473cb1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -39,6 +39,15 @@ public interface AMQConnectionDelegate
Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException;
+ /**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException thrown if there is a problem creating the session.
+ */
+ XASession createXASession() throws JMSException;
+
XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
void failoverPrep();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 927929c94a..211f72e9ba 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -100,6 +100,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
/**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
+ /**
* create an XA Session and start it if required.
*/
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 9876393d4c..45a134a0e6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -191,6 +191,18 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}, _conn).execute();
}
+ /**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException thrown if there is a problem creating the session.
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException, FailoverException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index fc81e32e4d..dd9a00ce10 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import java.io.Serializable;
-import java.io.IOException;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -92,7 +91,6 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,8 +113,6 @@ import org.slf4j.LoggerFactory;
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
-
-
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -263,10 +259,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
- private int _defaultPrefetchHighMark;
+ private int _prefetchHighMark;
/** Holds the low mark for prefetched messages, below which the session is resumed. */
- private int _defaultPrefetchLowMark;
+ private int _prefetchLowMark;
/** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
@@ -447,13 +443,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
- _defaultPrefetchHighMark = defaultPrefetchHighMark;
- _defaultPrefetchLowMark = defaultPrefetchLowMark;
+ _prefetchHighMark = defaultPrefetchHighMark;
+ _prefetchLowMark = defaultPrefetchLowMark;
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
- new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -461,7 +457,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void aboveThreshold(int currentValue)
{
_logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
+ "Above threshold(" + _prefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
_suspendState.set(true);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -471,7 +467,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void underThreshold(int currentValue)
{
_logger.debug(
- "Below threshold(" + _defaultPrefetchLowMark
+ "Below threshold(" + _prefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
_suspendState.set(false);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -481,7 +477,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+ _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
}
@@ -897,7 +893,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false,
messageSelector, null, true, true);
}
@@ -905,7 +901,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
false, false);
}
@@ -913,7 +909,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
false, false);
}
@@ -921,7 +917,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -930,7 +926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -939,7 +935,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
messageSelector, null, false, false);
}
@@ -1363,17 +1359,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public int getDefaultPrefetch()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchHigh()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchLow()
{
- return _defaultPrefetchLowMark;
+ return _prefetchLowMark;
}
public AMQShortString getDefaultQueueExchangeName()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
index 20fa68605a..43025bd724 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
@@ -47,7 +47,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ
public synchronized XASession createXASession() throws JMSException
{
checkNotClosed();
- return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2);
+ return _delegate.createXASession();
}
//-- Interface XAQueueConnection
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index a016e90d79..e5050b4fbd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -39,7 +39,7 @@ public class ClientProperties
* type: long
*/
public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
- public static final String MAX_PREFETCH_DEFAULT = "5000";
+ public static final String MAX_PREFETCH_DEFAULT = "500";
/**
* When true a sync command is sent after every persistent messages.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 8223cd5394..79bc29a4e3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -134,6 +134,8 @@ public class FailoverHandler implements Runnable
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
+
+ // Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
new file mode 100644
index 0000000000..5b6f5e35e9
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
@@ -0,0 +1,330 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.failover;
+
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.AMQConnectionClosedException;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test case based on user reported error.
+ *
+ * Summary:
+ * A user has reported message loss from their application. On bouncing of
+ * the broker the 'lost' messages are delivered to the broker.
+ *
+ * Note:
+ * The client was using Spring so that may influence the situation.
+ *
+ * Issue:
+ * The log files show 7 instances of the following which result in 7
+ * missing messages.
+ *
+ * The client log files show:
+ *
+ * The broker log file show:
+ *
+ *
+ * 7 missing messages have delivery tags 5-11. Which says that they are
+ * sequentially the next message from the broker.
+ *
+ * The only way for the 'without a handler' log to occur is if the consumer
+ * has been removed from the look up table of the dispatcher.
+ * And the only way for the 'null message' log to occur on the broker is is
+ * if the message does not exist in the unacked-map
+ *
+ * The consumer is only removed from the list during session
+ * closure and failover.
+ *
+ * If the session was closed then the broker would requeue the unacked
+ * messages so the potential exists to have an empty map but the broker
+ * will not send a message out after the unacked map has been cleared.
+ *
+ * When failover occurs the _consumer map is cleared and the consumers are
+ * resubscribed. This is down without first stopping any existing
+ * dispatcher so there exists the potential to receive a message after
+ * the _consumer map has been cleared which is how the 'without a handler'
+ * log statement occurs.
+ *
+ * Scenario:
+ *
+ * Looking over logs the sequence that best fits the events is as follows:
+ * - Something causes Mina to be delayed causing the WriteTimoutException.
+ * - This exception is recevied by AMQProtocolHandler#exceptionCaught
+ * - As the WriteTimeoutException is an IOException this will cause
+ * sessionClosed to be called to start failover.
+ * + This is potentially the issues here. All IOExceptions are treated
+ * as connection failure events.
+ * - Failover Runs
+ * + Failover assumes that the previous connection has been closed.
+ * + Failover binds the existing objects (AMQConnection/Session) to the
+ * new connection objects.
+ * - Everything is reported as being successfully failed over.
+ * However, what is neglected is that the original connection has not
+ * been closed.
+ * + So what occurs is that the broker sends a message to the consumer on
+ * the original connection, as it was not notified of the client
+ * failing over.
+ * As the client failover reuses the original AMQSession and Dispatcher
+ * the new messages the broker sends to the old consumer arrives at the
+ * client and is processed by the same AMQSession and Dispatcher.
+ * However, as the failover process cleared the _consumer map and
+ * resubscribe the consumers the Dispatcher does not recognise the
+ * delivery tag and so logs the 'without a handler' message.
+ * - The Dispatcher then attempts to reject the message, however,
+ * + The AMQSession/Dispatcher pair have been swapped to using a new Mina
+ * ProtocolSession as part of the failover process so the reject is
+ * sent down the second connection. The broker receives the Reject
+ * request but as the Message was sent on a different connection the
+ * unacknowledgemap is empty and a 'message is null' log message
+ * produced.
+ *
+ * Test Strategy:
+ *
+ * It should be easy to demonstrate if we can send an IOException to
+ * AMQProtocolHandler#exceptionCaught and then try sending a message.
+ *
+ * The current unknowns here are the type of consumers that are in use.
+ * If it was an exclusive queue(Durable Subscription) then why did the
+ * resubscribe not fail.
+ *
+ * If it was not exclusive then why did the messages not round robin?
+ */
+public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener
+{
+ private CountDownLatch _failoverOccured = new CountDownLatch(1);
+ AMQConnection _connection;
+ Session _session;
+ Queue _queue;
+ MessageConsumer _consumer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ stopBroker(getFailingPort());
+
+ }
+
+ /**
+ * Test Summary:
+ *
+ * Create a queue consumer and send 10 messages to the broker.
+ *
+ * Consume the first message.
+ * This will pull the rest into the prefetch
+ *
+ * Send an IOException to the MinaProtocolHandler.
+ *
+ * This will force failover to occur.
+ *
+ * 9 messages would normally be expected but it is expected that none will
+ * arrive. As they are still in the prefetch of the first session.
+ *
+ * To free the messages we need to close all connections.
+ * - Simply doing connection.close() and retesting will not be enough as
+ * the original connection's IO layer will still exist and is nolonger
+ * connected to the connection object as a result of failover.
+ *
+ * - Test will need to retain a reference to the original connection IO so
+ * that it can be closed releasing the messages to validate that the
+ * messages have indeed been 'lost' on that sesssion.
+ */
+ public void test() throws Exception
+ {
+ initialiseConnection();
+
+ // Create Producer
+ // Send 10 messages
+ List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10);
+
+ // Consume first messasge
+ Message received = _consumer.receive(2000);
+
+ // Verify received messages
+ assertNotNull("First message not received.", received);
+ assertEquals("Incorrect message Received",
+ messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+
+ // Allow ack to be sent to broker, by performing a synchronous command
+ // along the session.
+// _session.createConsumer(_session.createTemporaryQueue()).close();
+
+ //Retain IO Layer
+ AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();
+
+ // Send IO Exception - causing failover
+ _connection.getProtocolHandler().
+ exception(new WriteTimeoutException("WriteTimeoutException to cause failover."));
+
+ // Verify Failover occured through ConnectionListener
+ assertTrue("Failover did not occur",
+ _failoverOccured.await(4000, TimeUnit.MILLISECONDS));
+
+ //Verify new protocolSession is not the same as the original
+ assertNotSame("Protocol Session has not changed",
+ protocolSession,
+ _connection.getProtocolHandler().getProtocolSession());
+
+ /***********************************/
+ // This verifies that the bug has been resolved
+
+ // Attempt to consume again. Expect 9 messages
+ for (int count = 1; count < 10; count++)
+ {
+ received = _consumer.receive(2000);
+ assertNotNull("Expected message not received:" + count, received);
+ assertEquals(messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+ }
+
+ //Verify there are no more messages
+ received = _consumer.receive(1000);
+ assertNull("Message receieved when there should be none:" + received,
+ received);
+
+// /***********************************/
+// // This verifies that the bug exists
+//
+// // Attempt to consume remaining 9 messages.. Expecting NONE.
+// // receiving just one message should fail so no need to fail 9 times
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when it should be null:" + received, received);
+//
+//// //Close the Connection which you would assume would free the messages
+//// _connection.close();
+////
+//// // Reconnect
+//// initialiseConnection();
+////
+//// // We should still be unable to receive messages
+//// received = _consumer.receive(1000);
+//// assertNull("Message receieved when it should be null:" + received, received);
+////
+//// _connection.close();
+//
+// // Close original IO layer. Expecting messages to be released
+// protocolSession.closeProtocolSession();
+//
+// // Reconnect and all should be good.
+//// initialiseConnection();
+//
+// // Attempt to consume again. Expect 9 messages
+// for (int count = 1; count < 10; count++)
+// {
+// received = _consumer.receive(2000);
+// assertNotNull("Expected message not received:" + count, received);
+// assertEquals(messages.remove(0).getIntProperty("count"),
+// received.getIntProperty("count"));
+// }
+//
+// //Verify there are no more messages
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when there should be none:" + received,
+// received);
+ }
+
+ private void initialiseConnection()
+ throws Exception
+ {
+ //Create Connection
+ _connection = (AMQConnection) getConnection();
+ _connection.setConnectionListener(this);
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _queue = _session.createQueue(getName());
+
+ // Create Consumer
+ _consumer = _session.createConsumer(_queue);
+
+ //Start connection
+ _connection.start();
+ }
+
+ /** QpidTestCase back port to this release */
+
+ // modified from QTC as sendMessage is not testable.
+ // - should be renamed sendBlankBytesMessage
+ // - should be renamed sendNumberedBytesMessage
+ public List<Message> sendNumberedBytesMessage(Session session, Destination destination,
+ int count) throws Exception
+ {
+ List<Message> messages = new ArrayList<Message>(count);
+
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++)
+ {
+ Message next = session.createMessage();
+
+ next.setIntProperty("count", count);
+
+ producer.send(next);
+
+ messages.add(next);
+ }
+
+ producer.close();
+ return messages;
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverOccured.countDown();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index 97147904e1..f220760511 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -26,15 +26,18 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.AbstractTestLogging;
import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
import javax.naming.NamingException;
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.io.IOException;
-public class ProducerFlowControlTest extends QpidTestCase
+public class ProducerFlowControlTest extends AbstractTestLogging
{
private static final int TIMEOUT = 1500;
@@ -56,10 +59,12 @@ public class ProducerFlowControlTest extends QpidTestCase
private MessageConsumer consumer;
private final AtomicInteger _sentMessages = new AtomicInteger();
- protected void setUp() throws Exception
+ public void setUp() throws Exception
{
super.setUp();
+ _monitor.reset();
+
producerConnection = getConnection();
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -70,7 +75,7 @@ public class ProducerFlowControlTest extends QpidTestCase
}
- protected void tearDown() throws Exception
+ public void tearDown() throws Exception
{
producerConnection.close();
consumerConnection.close();
@@ -117,6 +122,76 @@ public class ProducerFlowControlTest extends QpidTestCase
}
+ public void testBrokerLogMessages()
+ throws JMSException, NamingException, AMQException, InterruptedException, IOException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+ List<String> results = _monitor.findMatches("QUE-1003");
+
+ assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+
+ while(consumer.receive(1000) != null);
+
+ results = _monitor.findMatches("QUE-1004");
+
+ assertEquals("Did not find correct number of QUE_1004 queue underfull messages", 1, results.size());
+
+
+
+ }
+
+
+ public void testClientLogMessages()
+ throws JMSException, NamingException, AMQException, InterruptedException, IOException
+ {
+ setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
+ setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
+
+ Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ producer = session.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(10000);
+ List<String> results = _monitor.findMatches("Message send delayed by");
+ assertEquals("Incorrect number of delay messages logged by client",3,results.size());
+ results = _monitor.findMatches("Message send failed due to timeout waiting on broker enforced flow control");
+ assertEquals("Incorrect number of send failure messages logged by client",1,results.size());
+
+
+
+ }
+
public void testFlowControlOnCapacityResumeEqual()
throws JMSException, NamingException, AMQException, InterruptedException
@@ -131,7 +206,6 @@ public class ProducerFlowControlTest extends QpidTestCase
_sentMessages.set(0);
-
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -212,8 +286,7 @@ public class ProducerFlowControlTest extends QpidTestCase
public void testSendTimeout()
throws JMSException, NamingException, AMQException, InterruptedException
{
- long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
- System.setProperty("qpid.flow_control_wait_failure","3000");
+ setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -237,10 +310,6 @@ public class ProducerFlowControlTest extends QpidTestCase
assertNotNull("No timeout exception on sending", e);
- System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue));
-
-
-
}
private MessageSender sendMessagesAsync(final MessageProducer producer,
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index bb7b5efc75..c198b33bee 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -21,31 +21,27 @@
package org.apache.qpid.server.security.acl;
-
-import junit.framework.TestCase;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.*;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.AMQException;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.*;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.naming.NamingException;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
public class SimpleACLTest extends QpidTestCase implements ConnectionListener, ExceptionListener
{
private String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;//"tcp://localhost:5672";
@@ -54,6 +50,14 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void setUp() throws Exception
{
+ //Performing setUp here would result in a broker with the default ACL test config
+
+ //Each test now calls the private setUpACLTest to allow them to make
+ //individual customisations to the base ACL settings
+ }
+
+ private void setUpACLTest() throws Exception
+ {
final String QPID_HOME = System.getProperty("QPID_HOME");
if (QPID_HOME == null)
@@ -73,8 +77,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
return "amqp://" + username + ":" + password + "@clientid/test?brokerlist='" + getBroker() + "?retries='0''";
}
- public void testAccessAuthorized() throws AMQException, URLSyntaxException
+ public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -96,6 +102,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testAccessNoRights() throws Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("guest", "guest");
@@ -120,8 +128,40 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
}
}
- public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException
+ public void testGuestConsumeWithCreateRightsAndWithoutConsumeRights() throws NamingException, ConfigurationException, IOException, Exception
+ {
+ //Customise the ACL config to give the guest user some create (could be any, non-consume) rights to
+ //force creation of a PrincipalPermissions instance to perform the consume rights check against.
+ setConfigurationProperty("virtualhosts.virtualhost.test.security.access_control_list.create.queues.queue.users.user", "guest");
+
+ setUpACLTest();
+
+ try
+ {
+ Connection conn = getConnection("guest", "guest");
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sesh.createConsumer(sesh.createQueue("example.RequestQueue"));
+
+ conn.close();
+ }
+ catch (JMSException e)
+ {
+ Throwable cause = e.getLinkedException();
+
+ assertNotNull("There was no liked exception", cause);
+ assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
+ assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ }
+ }
+
+ public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -142,6 +182,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -167,8 +209,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
}
}
- public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException
+ public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -191,6 +235,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -212,8 +258,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
}
}
- public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException
+ public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -239,8 +287,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
}
}
- public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException
+ public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -271,6 +321,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -311,8 +363,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
}
- public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
+ public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -333,6 +387,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -358,6 +414,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -391,8 +449,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
return (Connection) connection;
}
- public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException
+ public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -414,6 +474,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -436,6 +498,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -461,6 +525,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception
{
+ setUpACLTest();
+
Connection connection = null;
try
{
@@ -492,6 +558,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
*/
public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
//Set up the Server
Connection serverConnection = getConnection("server", "guest");
@@ -572,6 +640,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
index 30cc48691f..eb36522fac 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -61,7 +61,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
* @param transacted create a transacted session for this test
* @param mode if not transacted what ack mode to use for this test
* @throws Exception if a problem occured during test setup.
- */
+ */
@Override
protected void init(boolean transacted, int mode) throws Exception
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
index 73308d41c0..4254727d36 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
@@ -77,6 +77,9 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
{
break;
}
+ // Remember the currentCount as the lastCount for the next cycle.
+ // so we can exit if things get locked up.
+ lastCount = currentCount;
complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index fc82ff1778..b1d14721bd 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -57,7 +57,7 @@ public class FailoverBaseCase extends QpidTestCase
super.setUp();
// Set QPID_WORK to $QPID_WORK/<getFailingPort()>
// or /tmp/<getFailingPort()> if QPID_WORK not set.
- setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort());
startBroker(getFailingPort());
}
@@ -97,7 +97,7 @@ public class FailoverBaseCase extends QpidTestCase
// Ensure we shutdown any secondary brokers, even if we are unable
// to cleanly tearDown the QTC.
stopBroker(getFailingPort());
- FileUtils.deleteDirectory(System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
}
}
diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes
index 488ae24022..757a1e425c 100755
--- a/qpid/java/test-profiles/010Excludes
+++ b/qpid/java/test-profiles/010Excludes
@@ -99,3 +99,5 @@ org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker)
org.apache.qpid.server.queue.ProducerFlowControlTest#*
+//QPID-1950 : Commit to test this failure. This is a MINA only failure so it cannot be tested when using 010.
+org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#*
diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes
index d14d467b89..aa60554c04 100644
--- a/qpid/java/test-profiles/Excludes
+++ b/qpid/java/test-profiles/Excludes
@@ -30,3 +30,4 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck
// QPID-143 : Failover can occur between receive and ack but we don't stop the ack.
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk
+
diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties
index 70a2672263..8cea012c1d 100644
--- a/qpid/java/test-profiles/test-provider.properties
+++ b/qpid/java/test-profiles/test-provider.properties
@@ -34,7 +34,7 @@ connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
+connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20''
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'