summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-10-11 23:22:08 +0000
committerAidan Skinner <aidan@apache.org>2009-10-11 23:22:08 +0000
commit98cc985dbd81a84cd0b0a969c57cb941680ec81f (patch)
treea9060c1f208897cbd9dd4791b29202c78566993b /qpid/java
parent788f96fd8af146cba5bff57300b1a513988c34b9 (diff)
downloadqpid-python-98cc985dbd81a84cd0b0a969c57cb941680ec81f.tar.gz
Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@824198 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/etc/log4j.xml4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java109
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java106
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java278
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java1002
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java37
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java5
-rw-r--r--qpid/java/client/src/main/java/log4j.xml36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java152
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java13
-rwxr-xr-xqpid/java/common/bin/qpid-run1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java1
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java2
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java2
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java2
-rw-r--r--qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java2
-rw-r--r--qpid/java/module.xml2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java330
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java393
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java123
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java170
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java271
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java193
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java356
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java306
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java170
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java179
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java40
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java148
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java253
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SelectorTest.java306
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java403
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java83
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java221
-rwxr-xr-x[-rw-r--r--]qpid/java/test-profiles/010Excludes9
-rw-r--r--qpid/java/test-profiles/08Excludes2
-rw-r--r--qpid/java/test-profiles/08StandaloneExcludes2
-rw-r--r--qpid/java/test-profiles/Excludes12
-rw-r--r--qpid/java/test-profiles/default.testprofile3
-rw-r--r--qpid/java/test-profiles/log4j-test.xml6
-rw-r--r--qpid/java/test-profiles/test-provider.properties17
84 files changed, 4656 insertions, 1376 deletions
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index 8ca43ededd..4b71772a0e 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -87,6 +87,10 @@
<priority value="debug"/>
</category-->
+ <!-- Set the commons logging that the XML parser uses to WARN, it is very chatty at debug -->
+ <logger name="org.apache.commons">
+ <level value="WARN"/>
+ </logger>
<!-- Log all info events to file -->
<root>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index add5e64ee8..644a33db01 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -27,13 +27,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -42,12 +41,7 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -59,6 +53,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
@@ -119,6 +114,11 @@ public class AMQChannel
private final AMQProtocolSession _session;
private boolean _closing;
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+
+
private LogActor _actor;
private LogSubject _logSubject;
@@ -783,16 +783,32 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
-
+ /**
+ * Called from the ChannelFlowHandler to suspend this Channel
+ * @param suspended boolean, should this Channel be suspended
+ */
public void setSuspended(boolean suspended)
{
-
-
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
- _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+ // Log Flow Started before we start the subscriptions
+ if (!suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
+ }
+
+
+ // This section takes two different approaches to perform to perform
+ // the same function. Ensuring that the Subscription has taken note
+ // of the change in Channel State
+ // Here we have become unsuspended and so we ask each the queue to
+ // perform an Async delivery for each of the subscriptions in this
+ // Channel. The alternative would be to ensure that the subscription
+ // had received the change in suspension state. That way the logic
+ // behind decieding to start an async delivery was located with the
+ // Subscription.
if (wasSuspended)
{
// may need to deliver queued messages
@@ -801,6 +817,38 @@ public class AMQChannel
s.getQueue().deliverAsync(s);
}
}
+
+
+ // Here we have become suspended so we need to ensure that each of
+ // the Subscriptions has noticed this change so that we can be sure
+ // they are not still sending messages. Again the code here is a
+ // very simplistic approach to ensure that the change of suspension
+ // has been noticed by each of the Subscriptions. Unlike the above
+ // case we don't actually need to do anything else.
+ if (!wasSuspended)
+ {
+ // may need to deliver queued messages
+ for (Subscription s : _tag2SubscriptionMap.values())
+ {
+ try
+ {
+ s.getSendLock();
+ }
+ finally
+ {
+ s.releaseSendLock();
+ }
+ }
+ }
+
+
+ // Log Suspension only after we have confirmed all suspensions are
+ // stopped.
+ if (suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Stopped"));
+ }
+
}
}
@@ -931,4 +979,37 @@ public class AMQChannel
{
return _actor;
}
+
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString()));
+ flow(false);
+ }
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue))
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1006());
+
+ flow(true);
+ }
+ }
+ }
+
+ private void flow(boolean flow)
+ {
+ MethodRegistry methodRegistry = _session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
+ _session.writeFrame(responseBody.generateFrame(_channelId));
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 74bb7ee969..5c73e353de 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -108,4 +108,14 @@ public class QueueConfiguration
return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap());
}
+ public long getCapacity()
+ {
+ return _config.getLong("capacity", _vHostConfig.getCapacity());
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index a72c2889d1..641b44bb18 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -104,6 +104,8 @@ public class ServerConfiguration implements SignalHandler
envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth");
envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize");
envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap");
+ envVarMap.put("QPID_QUEUECAPACITY", "capacity");
+ envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity");
envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer");
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
@@ -290,7 +292,6 @@ public class ServerConfiguration implements SignalHandler
return conf;
}
- @Override
public void handle(Signal arg0)
{
try
@@ -508,6 +509,16 @@ public class ServerConfiguration implements SignalHandler
return getConfig().getLong("minimumAlertRepeatGap", 0);
}
+ public long getCapacity()
+ {
+ return getConfig().getLong("capacity", 0L);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return getConfig().getLong("flowResumeCapacity", getCapacity());
+ }
+
public int getProcessors()
{
return getConfig().getInt("connector.processors", 4);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 0273a13262..6c72025ec2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -166,4 +166,15 @@ public class VirtualHostConfiguration
return _config.getLong("queues.minimumAlertRepeatGap", 0);
}
+
+ public long getCapacity()
+ {
+ return _config.getLong("queues.capacity", 0l);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("queues.flowResumeCapacity", getCapacity());
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java
index 1541d3d892..9954719866 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java
@@ -35,13 +35,11 @@ public class ConfigurationManagementMBean extends AMQManagedObject implements Co
super(ConfigurationManagement.class, ConfigurationManagement.TYPE, ConfigurationManagement.VERSION);
}
- @Override
public String getObjectInstanceName()
{
return ConfigurationManagement.TYPE;
}
- @Override
public void reloadSecurityConfiguration() throws Exception
{
ApplicationRegistry.getInstance().getConfiguration().reparseConfigFile();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index c04b6c559c..620799a81f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -73,7 +73,6 @@ public class DefaultExchangeFactory implements ExchangeFactory
return e;
}
- @Override
public void initialise(VirtualHostConfiguration hostConfig)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index f3cab10ed7..fcf3fd4337 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
- }
+ }
else
{
if (message.isQueueDeleted())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 5d7adc6371..9918013888 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,6 +54,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+
final AMQChannel channel = new AMQChannel(session,channelId,
virtualHost.getMessageStore());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
index 4502710dd6..0059a48c06 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
@@ -60,4 +60,9 @@ public abstract class AbstractActor implements LogActor
return _rootLogger;
}
+ public String toString()
+ {
+ return _logString;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
index 9b928accc0..5d2762fd1d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
@@ -36,4 +36,12 @@ public class BrokerActor extends AbstractActor
_logString = "[Broker] ";
}
+
+ public BrokerActor(String name, RootMessageLogger logger)
+ {
+ super(logger);
+
+ _logString = "[Broker(" + name + ")] ";
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index 5ced7cc0b9..9169a1a651 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -249,12 +249,16 @@ CHN-1003 = Close
# 0 - bytes allowed in prefetch
# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
#Queue
# 0 - owner
# 1 - priority
QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
#Exchange
# 0 - type
@@ -269,4 +273,4 @@ BND-1002 = Deleted
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
-SUB-1003 = State : {0} \ No newline at end of file
+SUB-1003 = State : {0}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 2a692344d0..184504717e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -160,6 +160,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void setMinimumAlertRepeatGap(long value);
+ long getCapacity();
+
+ void setCapacity(long capacity);
+
+
+ long getFlowResumeCapacity();
+
+ void setFlowResumeCapacity(long flowResumeCapacity);
+
+
+
void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
long clearQueue(StoreContext storeContext) throws AMQException;
@@ -180,6 +191,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void stop();
+ void checkCapacity(AMQChannel channel);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 7509350e65..267ccf43ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -26,11 +26,105 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+import java.util.HashMap;
+
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ private abstract static class QueueProperty
+ {
+
+ private final AMQShortString _argumentName;
+
+
+ public QueueProperty(String argumentName)
+ {
+ _argumentName = new AMQShortString(argumentName);
+ }
+
+ public AMQShortString getArgumentName()
+ {
+ return _argumentName;
+ }
+
+
+ public abstract void setPropertyValue(AMQQueue queue, Object value);
+
+ }
+
+ private abstract static class QueueLongProperty extends QueueProperty
+ {
+
+ public QueueLongProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).longValue());
+ }
+
+ }
+
+ abstract void setPropertyValue(AMQQueue queue, long value);
+
+
+ }
+
+ private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
+ new QueueLongProperty("x-qpid-maximum-message-age")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageAge(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-size")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageSize(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-count")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageCount(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMinimumAlertRepeatGap(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setCapacity(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-flow-resume-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setFlowResumeCapacity(value);
+ }
+ }
+
+ };
+
+
+
public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
@@ -53,6 +147,18 @@ public class AMQQueueFactory
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+
+ if(arguments != null)
+ {
+ for(QueueProperty p : DECLAREABLE_PROPERTIES)
+ {
+ if(arguments.containsKey(p.getArgumentName()))
+ {
+ p.setPropertyValue(q, arguments.get(p.getArgumentName()));
+ }
+ }
+ }
+
return q;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index dbad5438dc..3b58f05f93 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -42,8 +42,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
-
+ private final AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
@@ -177,13 +176,21 @@ public class QueueEntryImpl implements QueueEntry
public String debugIdentity()
{
- return getMessage().debugIdentity();
+ AMQMessage message = getMessage();
+ if (message == null)
+ {
+ return "null";
+ }
+ else
+ {
+ return message.debugIdentity();
+ }
}
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return getMessage().immediateAndNotDelivered();
}
public void setRedelivered(boolean b)
@@ -385,4 +392,5 @@ public class QueueEntryImpl implements QueueEntry
{
return _queueEntryList;
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b14b92b014..d781dc4dea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1,11 +1,10 @@
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.AMQChannel;
/*
*
@@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final Executor _asyncDelivery;
private final AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
/** max allowed size(KB) of a single message */
public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -122,6 +124,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private LogSubject _logSubject;
private LogActor _logActor;
+
+ private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+ private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+ private final AtomicBoolean _overfull = new AtomicBoolean(false);
+
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -508,8 +515,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
_deliveredMessages.incrementAndGet();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
+ }
sub.send(entry);
-
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
@@ -626,6 +636,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new FailedDequeueException(_name.toString(), e);
}
+ checkCapacity();
+
}
private void decrementQueueSize(final QueueEntry entry)
@@ -1170,11 +1182,64 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- public void deliverAsync()
+ public void checkCapacity(AMQChannel channel)
{
- _stateChangeCount.incrementAndGet();
+ if(_capacity != 0l)
+ {
+ if(_atomicQueueSize.get() > _capacity)
+ {
+ _overfull.set(true);
+ //Overfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
+
+ if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
+ {
+ channel.block(this);
+ }
+
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+ channel.unblock(this);
+ _blockedChannels.remove(channel);
+
+ }
+
+ }
+
+
+
+ }
+ }
+
+ private void checkCapacity()
+ {
+ if(_capacity != 0L)
+ {
+ if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+ if(_overfull.compareAndSet(true,false))
+ {//Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+ }
- Runner runner = new Runner();
+
+ for(AMQChannel c : _blockedChannels.keySet())
+ {
+ c.unblock(this);
+ _blockedChannels.remove(c);
+ }
+ }
+ }
+ }
+
+
+ public void deliverAsync()
+ {
+ Runner runner = new Runner(_stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1187,13 +1252,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(new SubFlushRunner(sub));
}
+
private class Runner implements ReadWriteRunnable
{
+ String _name;
+ public Runner(long count)
+ {
+ _name = "QueueRunner-" + count + "-" + _logActor;
+ }
+
public void run()
{
+ String originalName = Thread.currentThread().getName();
try
{
+ Thread.currentThread().setName(_name);
CurrentActor.set(_logActor);
+
processQueue(this);
}
catch (AMQException e)
@@ -1203,9 +1278,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
finally
{
CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
}
-
-
}
public boolean isRead()
@@ -1217,6 +1291,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return true;
}
+
+ public String toString()
+ {
+ return _name;
+ }
}
private class SubFlushRunner implements ReadWriteRunnable
@@ -1230,27 +1309,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void run()
{
- boolean complete = false;
- try
- {
- CurrentActor.set(_sub.getLogActor());
- complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
- }
- catch (AMQException e)
- {
- _logger.error(e);
+ String originalName = Thread.currentThread().getName();
+ try{
+ Thread.currentThread().setName("SubFlushRunner-"+_sub);
+
+ boolean complete = false;
+ try
+ {
+ CurrentActor.set(_sub.getLogActor());
+ complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ if (!complete && !_sub.isSuspended())
+ {
+ _asyncDelivery.execute(this);
+ }
}
finally
{
- CurrentActor.remove();
- }
- if (!complete && !_sub.isSuspended())
- {
- _asyncDelivery.execute(this);
+ Thread.currentThread().setName(originalName);
}
-
}
public boolean isRead()
@@ -1278,7 +1366,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
sub.getSendLock();
- atTail = attemptDelivery(sub);
+ atTail = attemptDelivery(sub);
if (atTail && sub.isAutoClose())
{
unregisterSubscription(sub);
@@ -1308,63 +1396,81 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return atTail;
}
+ /**
+ * Attempt delivery for the given subscription.
+ *
+ * Looks up the next node for the subscription and attempts to deliver it.
+ *
+ * @param sub
+ * @return true if we have completed all possible deliveries for this sub.
+ * @throws AMQException
+ */
private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
boolean advanced = false;
- boolean subActive = sub.isActive();
+ boolean subActive = sub.isActive() && !sub.isSuspended();
if (subActive)
{
QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity());
+ }
if (!(node.isAcquired() || node.isDeleted()))
{
- if (!sub.isSuspended())
+ if (sub.hasInterest(node))
{
- if (sub.hasInterest(node))
+ if (!sub.wouldSuspend(node))
{
- if (!sub.wouldSuspend(node))
+ if (!sub.isBrowser() && !node.acquire(sub))
{
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
- }
- else
+ sub.restoreCredit(node);
+ }
+ else
+ {
+ deliverMessage(sub, node);
+
+ if (sub.isBrowser())
{
- deliverMessage(sub, node);
+ QueueEntry newNode = _entries.next(node);
- if (sub.isBrowser())
+ if (newNode != null)
{
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
+ advanced = true;
+ sub.setLastSeenEntry(node, newNode);
+ node = sub.getLastSeenEntry();
}
+
}
-
- }
- else // Not enough Credit for message and wouldSuspend
- {
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
- node.addStateChangeListener(new QueueEntryListener(sub, node));
}
+
}
- else
+ else // Not enough Credit for message and wouldSuspend
{
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
+ //QPID-1187 - Treat the subscription as suspended for this message
+ // and wait for the message to be removed to continue delivery.
+
+ // 2009-09-30 : MR : setting subActive = false only causes, this
+ // particular delivery attempt to end. This is called from
+ // flushSubscription and processQueue both of which attempt
+ // delivery a number of times. Won't a bytes limited
+ // subscriber with not enough credit for the next message
+ // create a lot of new QELs? How about a browser that calls
+ // this method LONG.MAX_LONG times!
+ subActive = false;
+ node.addStateChangeListener(new QueueEntryListener(sub, node));
+ }
+ }
+ else
+ {
+ // this subscription is not interested in this node so we can skip over it
+ QueueEntry newNode = _entries.next(node);
+ if (newNode != null)
+ {
+ sub.setLastSeenEntry(node, newNode);
}
}
-
}
atTail = (_entries.next(node) == null) && !advanced;
}
@@ -1409,6 +1515,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity()));
+ }
+
return node;
}
@@ -1423,6 +1535,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asynchronousRunner.compareAndSet(runner, null);
+ // For every message enqueue/requeue the we fire deliveryAsync() which
+ // increases _stateChangeCount. If _sCC changes whilst we are in our loop
+ // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
+ // then we will continue to run for a maximum of iterations.
+ // So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
@@ -1442,20 +1559,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//iterate over the subscribers and try to advance their pointer
while (subscriptionIter.advance())
{
- boolean closeConsumer = false;
Subscription sub = subscriptionIter.getNode().getSubscription();
sub.getSendLock();
try
{
- if (sub != null)
- {
-
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null)
- {
- done = attemptDelivery(sub);
- }
- }
+ done = attemptDelivery(sub);
if (done)
{
if (extraLoops == 0)
@@ -1492,11 +1600,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rescheduling runner:" + runner);
+ }
_asyncDelivery.execute(runner);
}
}
- @Override
public void checkMessageStatus() throws AMQException
{
@@ -1604,6 +1715,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public long getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(long capacity)
+ {
+ _capacity = capacity;
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _flowResumeCapacity;
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ _flowResumeCapacity = flowResumeCapacity;
+ }
+
+
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
@@ -1673,6 +1805,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setMaximumMessageSize(config.getMaximumMessageSize());
setMaximumMessageCount(config.getMaximumMessageCount());
setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 9575bfa1ec..1affdd6590 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -102,7 +102,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
- instance.initialise();
+ instance.initialise(instanceID);
}
catch (Exception e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index a049d1eb09..831f928832 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -42,18 +42,22 @@ import java.io.File;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
+ private String _registryName;
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
super(new ServerConfiguration(configurationURL));
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
_rootMessageLogger = new RootMessageLoggerImpl(_configuration,
new Log4jMessageLogger());
+
+ _registryName = String.valueOf(instanceID);
+
// Set the Actor for current log messages
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
+ CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));
@@ -83,7 +87,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void close() throws Exception
{
//Set the Actor for Broker Shutdown
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
+ CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
try
{
super.close();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index ddb3fce5d2..92accb3499 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -43,8 +43,9 @@ public interface IApplicationRegistry
* Initialise the application registry. All initialisation must be done in this method so that any components
* that need access to the application registry itself for initialisation are able to use it. Attempting to
* initialise in the constructor will lead to failures since the registry reference will not have been set.
+ * @param instanceID the instanceID that we can use to identify this AR.
*/
- void initialise() throws Exception;
+ void initialise(int instanceID) throws Exception;
/**
* Shutdown this Registry
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
index f852514444..d2bbb795e9 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.server.security.access;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.exchange.Exchange;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
public class PrincipalPermissions
{
@@ -41,6 +42,7 @@ public class PrincipalPermissions
private static final Object CREATE_QUEUES_KEY = new Object();
private static final Object CREATE_EXCHANGES_KEY = new Object();
+
private static final Object CREATE_QUEUE_TEMPORARY_KEY = new Object();
private static final Object CREATE_QUEUE_QUEUES_KEY = new Object();
private static final Object CREATE_QUEUE_EXCHANGES_KEY = new Object();
@@ -80,248 +82,257 @@ public class PrincipalPermissions
{
switch (permission)
{
- case ACCESS:
- break; // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS
- case BIND:
- break; // All the details are currently included in the create setup.
case CONSUME: // Parameters : AMQShortString queueName, Boolean Temporary, Boolean ownQueueOnly
- Map consumeRights = (Map) _permissions.get(permission);
-
- if (consumeRights == null)
- {
- consumeRights = new ConcurrentHashMap();
- _permissions.put(permission, consumeRights);
- }
-
- //if we have parametsre
- if (parameters.length > 0)
- {
- AMQShortString queueName = (AMQShortString) parameters[0];
- Boolean temporary = (Boolean) parameters[1];
- Boolean ownQueueOnly = (Boolean) parameters[2];
-
- if (temporary)
- {
- consumeRights.put(CONSUME_TEMPORARY_KEY, true);
- }
- else
- {
- consumeRights.put(CONSUME_TEMPORARY_KEY, false);
- }
-
- if (ownQueueOnly)
- {
- consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true);
- }
- else
- {
- consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false);
- }
-
-
- LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY);
- if (queues == null)
- {
- queues = new LinkedList();
- consumeRights.put(CONSUME_QUEUES_KEY, queues);
- }
-
- if (queueName != null)
- {
- queues.add(queueName);
- }
- }
-
-
+ grantConsume(permission, parameters);
break;
case CREATEQUEUE: // Parameters : Boolean temporary, AMQShortString queueName
// , AMQShortString exchangeName , AMQShortString routingKey
-
- Map createRights = (Map) _permissions.get(permission);
-
- if (createRights == null)
- {
- createRights = new ConcurrentHashMap();
- _permissions.put(permission, createRights);
-
- }
-
- //The existence of the empty map mean permission to all.
- if (parameters.length == 0)
- {
- return;
- }
-
- Boolean temporary = (Boolean) parameters[0];
-
- AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
- AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null;
- //Set the routingkey to the specified value or the queueName if present
- AMQShortString routingKey = (parameters.length > 3 && null != parameters[3]) ? (AMQShortString) parameters[3] : queueName;
-
- // Get the queues map
- Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
-
- if (create_queues == null)
- {
- create_queues = new ConcurrentHashMap();
- createRights.put(CREATE_QUEUES_KEY, create_queues);
- }
-
- //Allow all temp queues to be created
- create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary);
-
- //Create empty list of queues
- Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
-
- if (create_queues_queues == null)
- {
- create_queues_queues = new ConcurrentHashMap();
- create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues);
- }
-
- // We are granting CREATE rights to all temporary queues only
- if (parameters.length == 1)
- {
- return;
- }
-
- // if we have a queueName then we need to store any associated exchange / rk bindings
- if (queueName != null)
- {
- Map queue = (Map) create_queues_queues.get(queueName);
- if (queue == null)
- {
- queue = new ConcurrentHashMap();
- create_queues_queues.put(queueName, queue);
- }
-
- if (exchangeName != null)
- {
- queue.put(exchangeName, routingKey);
- }
-
- //If no exchange is specified then the presence of the queueName in the map says any exchange is ok
- }
-
- // Store the exchange that we are being granted rights to. This will be used as part of binding
-
- //Lookup the list of exchanges
- Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
-
- if (create_queues_exchanges == null)
- {
- create_queues_exchanges = new ConcurrentHashMap();
- create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges);
- }
-
- //if we have an exchange
- if (exchangeName != null)
- {
- //Retrieve the list of permitted exchanges.
- Map exchanges = (Map) create_queues_exchanges.get(exchangeName);
-
- if (exchanges == null)
- {
- exchanges = new ConcurrentHashMap();
- create_queues_exchanges.put(exchangeName, exchanges);
- }
-
- //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY
- exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary);
-
- //Store the binding details of queue/rk for this exchange.
- if (queueName != null)
- {
- //Retrieve the list of permitted routingKeys.
- Map rKeys = (Map) exchanges.get(exchangeName);
-
- if (rKeys == null)
- {
- rKeys = new ConcurrentHashMap();
- exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys);
- }
-
- rKeys.put(queueName, routingKey);
- }
- }
+ grantCreateQueue(permission, parameters);
break;
case CREATEEXCHANGE:
// Parameters AMQShortString exchangeName , AMQShortString Class
- Map rights = (Map) _permissions.get(permission);
- if (rights == null)
- {
- rights = new ConcurrentHashMap();
- _permissions.put(permission, rights);
- }
-
- Map create_exchanges = (Map) rights.get(CREATE_EXCHANGES_KEY);
- if (create_exchanges == null)
- {
- create_exchanges = new ConcurrentHashMap();
- rights.put(CREATE_EXCHANGES_KEY, create_exchanges);
- }
-
- //Should perhaps error if parameters[0] is null;
- AMQShortString name = parameters.length > 0 ? (AMQShortString) parameters[0] : null;
- AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : new AMQShortString("direct");
-
- //Store the exchangeName / class mapping if the mapping is null
- rights.put(name, className);
- break;
- case DELETE:
+ grantCreateExchange(permission, parameters);
break;
-
case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
- Map publishRights = (Map) _permissions.get(permission);
-
- if (publishRights == null)
- {
- publishRights = new ConcurrentHashMap();
- _permissions.put(permission, publishRights);
- }
-
- if (parameters == null || parameters.length == 0)
- {
- //If we have no parameters then allow publish to all destinations
- // this is signified by having a null value for publish_exchanges
- }
- else
- {
- Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
-
- if (publish_exchanges == null)
- {
- publish_exchanges = new ConcurrentHashMap();
- publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges);
- }
-
-
- HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]);
-
- // Check to see if we have a routing key
- if (parameters.length == 2)
- {
- if (routingKeys == null)
- {
- routingKeys = new HashSet<AMQShortString>();
- }
- //Add routing key to permitted publish destinations
- routingKeys.add(parameters[1]);
- }
-
- // Add the updated routingkey list or null if all values allowed
- publish_exchanges.put(parameters[0], routingKeys);
- }
+ grantPublish(permission, parameters);
break;
+ /* The other cases just fall through to no-op */
+ case DELETE:
+ case ACCESS: // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS
+ case BIND: // All the details are currently included in the create setup.
case PURGE:
- break;
case UNBIND:
break;
}
}
+ private void grantPublish(Permission permission, Object... parameters) {
+ Map publishRights = (Map) _permissions.get(permission);
+
+ if (publishRights == null)
+ {
+ publishRights = new ConcurrentHashMap();
+ _permissions.put(permission, publishRights);
+ }
+
+ if (parameters == null || parameters.length == 0)
+ {
+ //If we have no parameters then allow publish to all destinations
+ // this is signified by having a null value for publish_exchanges
+ }
+ else
+ {
+ Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+ if (publish_exchanges == null)
+ {
+ publish_exchanges = new ConcurrentHashMap();
+ publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges);
+ }
+
+
+ HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]);
+
+ // Check to see if we have a routing key
+ if (parameters.length == 2)
+ {
+ if (routingKeys == null)
+ {
+ routingKeys = new HashSet<AMQShortString>();
+ }
+ //Add routing key to permitted publish destinations
+ routingKeys.add(parameters[1]);
+ }
+
+ // Add the updated routingkey list or null if all values allowed
+ publish_exchanges.put(parameters[0], routingKeys);
+ }
+ }
+
+ private void grantCreateExchange(Permission permission, Object... parameters) {
+ Map rights = (Map) _permissions.get(permission);
+ if (rights == null)
+ {
+ rights = new ConcurrentHashMap();
+ _permissions.put(permission, rights);
+ }
+
+ Map create_exchanges = (Map) rights.get(CREATE_EXCHANGES_KEY);
+ if (create_exchanges == null)
+ {
+ create_exchanges = new ConcurrentHashMap();
+ rights.put(CREATE_EXCHANGES_KEY, create_exchanges);
+ }
+
+ //Should perhaps error if parameters[0] is null;
+ AMQShortString name = parameters.length > 0 ? (AMQShortString) parameters[0] : null;
+ AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : new AMQShortString("direct");
+
+ //Store the exchangeName / class mapping if the mapping is null
+ rights.put(name, className);
+ }
+
+ private void grantCreateQueue(Permission permission, Object... parameters) {
+ Map createRights = (Map) _permissions.get(permission);
+
+ if (createRights == null)
+ {
+ createRights = new ConcurrentHashMap();
+ _permissions.put(permission, createRights);
+
+ }
+
+ //The existence of the empty map mean permission to all.
+ if (parameters.length == 0)
+ {
+ return;
+ }
+
+ Boolean temporary = (Boolean) parameters[0];
+
+ AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
+ AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null;
+ //Set the routingkey to the specified value or the queueName if present
+ AMQShortString routingKey = (parameters.length > 3 && null != parameters[3]) ? (AMQShortString) parameters[3] : queueName;
+
+ // Get the queues map
+ Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+ if (create_queues == null)
+ {
+ create_queues = new ConcurrentHashMap();
+ createRights.put(CREATE_QUEUES_KEY, create_queues);
+ }
+
+ //Allow all temp queues to be created
+ create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary);
+
+ //Create empty list of queues
+ Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+ if (create_queues_queues == null)
+ {
+ create_queues_queues = new ConcurrentHashMap();
+ create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues);
+ }
+
+ // We are granting CREATE rights to all temporary queues only
+ if (parameters.length == 1)
+ {
+ return;
+ }
+
+ // if we have a queueName then we need to store any associated exchange / rk bindings
+ if (queueName != null)
+ {
+ Map queue = (Map) create_queues_queues.get(queueName);
+ if (queue == null)
+ {
+ queue = new ConcurrentHashMap();
+ create_queues_queues.put(queueName, queue);
+ }
+
+ if (exchangeName != null)
+ {
+ queue.put(exchangeName, routingKey);
+ }
+
+ //If no exchange is specified then the presence of the queueName in the map says any exchange is ok
+ }
+
+ // Store the exchange that we are being granted rights to. This will be used as part of binding
+
+ //Lookup the list of exchanges
+ Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+ if (create_queues_exchanges == null)
+ {
+ create_queues_exchanges = new ConcurrentHashMap();
+ create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges);
+ }
+
+ //if we have an exchange
+ if (exchangeName != null)
+ {
+ //Retrieve the list of permitted exchanges.
+ Map exchanges = (Map) create_queues_exchanges.get(exchangeName);
+
+ if (exchanges == null)
+ {
+ exchanges = new ConcurrentHashMap();
+ create_queues_exchanges.put(exchangeName, exchanges);
+ }
+
+ //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY
+ exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary);
+
+ //Store the binding details of queue/rk for this exchange.
+ if (queueName != null)
+ {
+ //Retrieve the list of permitted routingKeys.
+ Map rKeys = (Map) exchanges.get(exchangeName);
+
+ if (rKeys == null)
+ {
+ rKeys = new ConcurrentHashMap();
+ exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys);
+ }
+
+ rKeys.put(queueName, routingKey);
+ }
+ }
+ }
+
+ private void grantConsume(Permission permission, Object... parameters) {
+ Map consumeRights = (Map) _permissions.get(permission);
+
+ if (consumeRights == null)
+ {
+ consumeRights = new ConcurrentHashMap();
+ _permissions.put(permission, consumeRights);
+ }
+
+ //if we have parametsre
+ if (parameters.length > 0)
+ {
+ AMQShortString queueName = (AMQShortString) parameters[0];
+ Boolean temporary = (Boolean) parameters[1];
+ Boolean ownQueueOnly = (Boolean) parameters[2];
+
+ if (temporary)
+ {
+ consumeRights.put(CONSUME_TEMPORARY_KEY, true);
+ }
+ else
+ {
+ consumeRights.put(CONSUME_TEMPORARY_KEY, false);
+ }
+
+ if (ownQueueOnly)
+ {
+ consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true);
+ }
+ else
+ {
+ consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false);
+ }
+
+
+ LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY);
+ if (queues == null)
+ {
+ queues = new LinkedList();
+ consumeRights.put(CONSUME_QUEUES_KEY, queues);
+ }
+
+ if (queueName != null)
+ {
+ queues.add(queueName);
+ }
+ }
+ }
+
/**
*
* @param permission the type of permission to check
@@ -346,267 +357,286 @@ public class PrincipalPermissions
return AuthzResult.ALLOWED; // This is here for completeness but the SimpleXML ACLManager never calls it.
// The existence of this user specific PP can be validated in the map SimpleXML maintains.
case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey
-
- Exchange exchange = (Exchange) parameters[1];
-
- AMQQueue bind_queueName = (AMQQueue) parameters[2];
- AMQShortString routingKey = (AMQShortString) parameters[3];
-
- //Get all Create Rights for this user
- Map bindCreateRights = (Map) _permissions.get(Permission.CREATEQUEUE);
-
- //Look up the Queue Creation Rights
- Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY);
-
- //Lookup the list of queues
- Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY);
-
- // Check and see if we have a queue white list to check
- if (bind_create_queues_queues != null)
- {
- //There a white list for queues
- Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName);
-
- if (exchangeDetails == null) //Then all queue can be bound to all exchanges.
- {
- return AuthzResult.ALLOWED;
- }
-
- // Check to see if we have a white list of routingkeys to check
- Map rkeys = (Map) exchangeDetails.get(exchange.getName());
-
- // if keys is null then any rkey is allowed on this exchange
- if (rkeys == null)
- {
- // There is no routingkey white list
- return AuthzResult.ALLOWED;
- }
- else
- {
- // We have routingKeys so a match must be found to allowed binding
- Iterator keys = rkeys.keySet().iterator();
-
- boolean matched = false;
- while (keys.hasNext() && !matched)
- {
- AMQShortString rkey = (AMQShortString) keys.next();
- if (rkey.endsWith("*"))
- {
- matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString());
- }
- else
- {
- matched = routingKey.equals(rkey);
- }
- }
-
-
- return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
-
-
- }
- else
- {
- //There a is no white list for queues
-
- // So can allow all queues to be bound
- // but we should first check and see if we have a temp queue and validate that we are allowed
- // to bind temp queues.
-
- //Check to see if we have a temporary queue
- if (bind_queueName.isAutoDelete())
- {
- // Check and see if we have an exchange white list.
- Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
-
- // If the exchange exists then we must check to see if temporary queues are allowed here
- if (bind_exchanges != null)
- {
- // Check to see if the requested exchange is allowed.
- Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName());
-
- return ((Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
-
- //no white list so all allowed, drop through to return true below.
- }
-
- // not a temporary queue and no white list so all allowed.
- return AuthzResult.ALLOWED;
- }
-
+ return authoriseBind(parameters);
case CREATEQUEUE:// Parameters : boolean autodelete, AMQShortString name
-
- Map createRights = (Map) _permissions.get(permission);
-
- // If there are no create rights then deny request
- if (createRights == null)
- {
- return AuthzResult.DENIED;
- }
-
- //Look up the Queue Creation Rights
- Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
-
- //Lookup the list of queues allowed to be created
- Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
-
-
- AMQShortString queueName = (AMQShortString) parameters[1];
- Boolean autoDelete = (Boolean) parameters[0];
-
- if (autoDelete)// we have a temporary queue
- {
- return ((Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- else
- {
- // If there is a white list then check
- if (create_queues_queues == null || create_queues_queues.containsKey(queueName))
- {
- return AuthzResult.ALLOWED;
- }
- else
- {
- return AuthzResult.DENIED;
- }
-
- }
+ return authoriseCreateQueue(permission, parameters);
case CREATEEXCHANGE:
- Map rights = (Map) _permissions.get(permission);
-
- AMQShortString exchangeName = (AMQShortString) parameters[0];
-
- // If the exchange list is doesn't exist then all is allowed else
- // check the valid exchanges
- if (rights == null || rights.containsKey(exchangeName))
- {
- return AuthzResult.ALLOWED;
- }
- else
- {
- return AuthzResult.DENIED;
- }
+ return authoriseCreateExchange(permission, parameters);
case CONSUME: // Parameters : AMQQueue
-
- if (parameters.length == 1 && parameters[0] instanceof AMQQueue)
- {
- AMQQueue queue = ((AMQQueue) parameters[0]);
- Map queuePermissions = (Map) _permissions.get(permission);
-
- List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
-
- Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY);
- Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY);
-
- // If user is allowed to publish to temporary queues and this is a temp queue then allow it.
- if (temporayQueues)
- {
- if (queue.isAutoDelete())
- // This will allow consumption from any temporary queue including ones not owned by this user.
- // Of course the exclusivity will not be broken.
- {
- // if not limited to ownQueuesOnly then ok else check queue Owner.
- return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- else
- {
- return AuthzResult.DENIED;
- }
- }
-
- // if queues are white listed then ensure it is ok
- if (queues != null)
- {
- // if no queues are listed then ALL are ok othereise it must be specified.
- if (ownQueuesOnly)
- {
- if (queue.getOwner().equals(_user))
- {
- return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- else
- {
- return AuthzResult.DENIED;
- }
- }
-
- // If we are
- return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- }
-
- // Can't authenticate without the right parameters
- return AuthzResult.DENIED;
- case DELETE:
- break;
-
+ return authoriseConsume(permission, parameters);
case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
- Map publishRights = (Map) _permissions.get(permission);
-
- if (publishRights == null)
- {
- return AuthzResult.DENIED;
- }
-
- Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
-
- // Having no exchanges listed gives full publish rights to all exchanges
- if (exchanges == null)
- {
- return AuthzResult.ALLOWED;
- }
- // Otherwise exchange must be listed in the white list
-
- // If the map doesn't have the exchange then it isn't allowed
- if (!exchanges.containsKey(((Exchange) parameters[0]).getName()))
- {
- return AuthzResult.DENIED;
- }
- else
- {
-
- // Get valid routing keys
- HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName());
-
- // Having no routingKeys in the map then all are allowed.
- if (routingKeys == null)
- {
- return AuthzResult.ALLOWED;
- }
- else
- {
- // We have routingKeys so a match must be found to allowed binding
- Iterator keys = routingKeys.iterator();
-
-
- AMQShortString publishRKey = (AMQShortString)parameters[1];
-
- boolean matched = false;
- while (keys.hasNext() && !matched)
- {
- AMQShortString rkey = (AMQShortString) keys.next();
-
- if (rkey.endsWith("*"))
- {
- matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1));
- }
- else
- {
- matched = publishRKey.equals(rkey);
- }
- }
- return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- }
+ return authorisePublish(permission, parameters);
+ /* Fall through */
+ case DELETE:
case PURGE:
- break;
case UNBIND:
- break;
-
+ default:
+ return AuthzResult.DENIED;
}
- return AuthzResult.DENIED;
}
+
+ private AuthzResult authoriseConsume(Permission permission, Object... parameters) {
+ if (parameters.length == 1 && parameters[0] instanceof AMQQueue)
+ {
+ AMQQueue queue = ((AMQQueue) parameters[0]);
+ Map queuePermissions = (Map) _permissions.get(permission);
+
+ if (queuePermissions == null)
+ {
+ //if the outer map is null, the user has no CONSUME rights at all
+ return AuthzResult.DENIED;
+ }
+
+ List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
+
+ Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY);
+ Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY);
+
+ // If user is allowed to publish to temporary queues and this is a temp queue then allow it.
+ if (temporayQueues)
+ {
+ if (queue.isAutoDelete())
+ // This will allow consumption from any temporary queue including ones not owned by this user.
+ // Of course the exclusivity will not be broken.
+ {
+ // if not limited to ownQueuesOnly then ok else check queue Owner.
+ return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ else
+ {
+ return AuthzResult.DENIED;
+ }
+ }
+
+ // if queues are white listed then ensure it is ok
+ if (queues != null)
+ {
+ // if no queues are listed then ALL are ok othereise it must be specified.
+ if (ownQueuesOnly)
+ {
+ if (queue.getOwner().equals(_user))
+ {
+ return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ else
+ {
+ return AuthzResult.DENIED;
+ }
+ }
+
+ // If we are
+ return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ }
+
+ // Can't authenticate without the right parameters
+ return AuthzResult.DENIED;
+ }
+
+ private AuthzResult authorisePublish(Permission permission, Object... parameters) {
+ Map publishRights = (Map) _permissions.get(permission);
+
+ if (publishRights == null)
+ {
+ return AuthzResult.DENIED;
+ }
+
+ Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+ // Having no exchanges listed gives full publish rights to all exchanges
+ if (exchanges == null)
+ {
+ return AuthzResult.ALLOWED;
+ }
+ // Otherwise exchange must be listed in the white list
+
+ // If the map doesn't have the exchange then it isn't allowed
+ if (!exchanges.containsKey(((Exchange) parameters[0]).getName()))
+ {
+ return AuthzResult.DENIED;
+ }
+ else
+ {
+
+ // Get valid routing keys
+ HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName());
+
+ // Having no routingKeys in the map then all are allowed.
+ if (routingKeys == null)
+ {
+ return AuthzResult.ALLOWED;
+ }
+ else
+ {
+ // We have routingKeys so a match must be found to allowed binding
+ Iterator keys = routingKeys.iterator();
+
+
+ AMQShortString publishRKey = (AMQShortString)parameters[1];
+
+ boolean matched = false;
+ while (keys.hasNext() && !matched)
+ {
+ AMQShortString rkey = (AMQShortString) keys.next();
+
+ if (rkey.endsWith("*"))
+ {
+ matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1));
+ }
+ else
+ {
+ matched = publishRKey.equals(rkey);
+ }
+ }
+ return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ }
+ }
+
+ private AuthzResult authoriseCreateExchange(Permission permission, Object... parameters) {
+ Map rights = (Map) _permissions.get(permission);
+
+ AMQShortString exchangeName = (AMQShortString) parameters[0];
+
+ // If the exchange list is doesn't exist then all is allowed else
+ // check the valid exchanges
+ if (rights == null || rights.containsKey(exchangeName))
+ {
+ return AuthzResult.ALLOWED;
+ }
+ else
+ {
+ return AuthzResult.DENIED;
+ }
+ }
+
+ private AuthzResult authoriseCreateQueue(Permission permission, Object... parameters) {
+ Map createRights = (Map) _permissions.get(permission);
+
+ // If there are no create rights then deny request
+ if (createRights == null)
+ {
+ return AuthzResult.DENIED;
+ }
+
+ //Look up the Queue Creation Rights
+ Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+ //Lookup the list of queues allowed to be created
+ Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+
+ AMQShortString queueName = (AMQShortString) parameters[1];
+ Boolean autoDelete = (Boolean) parameters[0];
+
+ if (autoDelete)// we have a temporary queue
+ {
+ return ((Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ else
+ {
+ // If there is a white list then check
+ if (create_queues_queues == null || create_queues_queues.containsKey(queueName))
+ {
+ return AuthzResult.ALLOWED;
+ }
+ else
+ {
+ return AuthzResult.DENIED;
+ }
+
+ }
+ }
+
+ private AuthzResult authoriseBind(Object... parameters) {
+ Exchange exchange = (Exchange) parameters[1];
+
+ AMQQueue bind_queueName = (AMQQueue) parameters[2];
+ AMQShortString routingKey = (AMQShortString) parameters[3];
+
+ //Get all Create Rights for this user
+ Map bindCreateRights = (Map) _permissions.get(Permission.CREATEQUEUE);
+
+ //Look up the Queue Creation Rights
+ Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY);
+
+ //Lookup the list of queues
+ Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY);
+
+ // Check and see if we have a queue white list to check
+ if (bind_create_queues_queues != null)
+ {
+ //There a white list for queues
+ Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName);
+
+ if (exchangeDetails == null) //Then all queue can be bound to all exchanges.
+ {
+ return AuthzResult.ALLOWED;
+ }
+
+ // Check to see if we have a white list of routingkeys to check
+ Map rkeys = (Map) exchangeDetails.get(exchange.getName());
+
+ // if keys is null then any rkey is allowed on this exchange
+ if (rkeys == null)
+ {
+ // There is no routingkey white list
+ return AuthzResult.ALLOWED;
+ }
+ else
+ {
+ // We have routingKeys so a match must be found to allowed binding
+ Iterator keys = rkeys.keySet().iterator();
+
+ boolean matched = false;
+ while (keys.hasNext() && !matched)
+ {
+ AMQShortString rkey = (AMQShortString) keys.next();
+ if (rkey.endsWith("*"))
+ {
+ matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString());
+ }
+ else
+ {
+ matched = routingKey.equals(rkey);
+ }
+ }
+
+
+ return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+
+
+ }
+ else
+ {
+ //There a is no white list for queues
+
+ // So can allow all queues to be bound
+ // but we should first check and see if we have a temp queue and validate that we are allowed
+ // to bind temp queues.
+
+ //Check to see if we have a temporary queue
+ if (bind_queueName.isAutoDelete())
+ {
+ // Check and see if we have an exchange white list.
+ Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+ // If the exchange exists then we must check to see if temporary queues are allowed here
+ if (bind_exchanges != null)
+ {
+ // Check to see if the requested exchange is allowed.
+ Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName());
+
+ return ((Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+
+ //no white list so all allowed, drop through to return true below.
+ }
+
+ // not a temporary queue and no white list so all allowed.
+ return AuthzResult.ALLOWED;
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
index a6fae053c2..ca6b68dafa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
@@ -35,28 +35,24 @@ public abstract class BasicACLPlugin implements ACLPlugin
// Returns true or false if the plugin should authorise or deny the request
protected abstract AuthzResult getResult();
- @Override
public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch,
AMQQueue queue, AMQShortString routingKey)
{
return getResult();
}
- @Override
public AuthzResult authoriseConnect(AMQProtocolSession session,
VirtualHost virtualHost)
{
return getResult();
}
- @Override
public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck,
AMQQueue queue)
{
return getResult();
}
- @Override
public AuthzResult authoriseConsume(AMQProtocolSession session,
boolean exclusive, boolean noAck, boolean noLocal, boolean nowait,
AMQQueue queue)
@@ -64,7 +60,6 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
public AuthzResult authoriseCreateExchange(AMQProtocolSession session,
boolean autoDelete, boolean durable, AMQShortString exchangeName,
boolean internal, boolean nowait, boolean passive,
@@ -73,7 +68,6 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
public AuthzResult authoriseCreateQueue(AMQProtocolSession session,
boolean autoDelete, boolean durable, boolean exclusive,
boolean nowait, boolean passive, AMQShortString queue)
@@ -81,19 +75,16 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
{
return getResult();
}
- @Override
public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
{
return getResult();
}
- @Override
public AuthzResult authorisePublish(AMQProtocolSession session,
boolean immediate, boolean mandatory, AMQShortString routingKey,
Exchange e)
@@ -101,20 +92,17 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
{
return getResult();
}
- @Override
public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch,
AMQShortString routingKey, AMQQueue queue)
{
return getResult();
}
- @Override
public void setConfiguration(Configuration config)
{
// no-op
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
index 3a81932123..7450322130 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
@@ -211,7 +211,6 @@ public class FirewallPlugin extends AbstractACLPlugin
}
- @Override
public void setConfiguration(Configuration config) throws ConfigurationException
{
// Get default action
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
index 4efe381a8b..8658101cd8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
@@ -42,7 +42,6 @@ public class PropertiesPrincipalDatabaseManager implements PrincipalDatabaseMana
return _databases;
}
- @Override
public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException
{
//todo
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 72d6afc65c..2893e916cc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -630,11 +630,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public QueueEntry getLastSeenEntry()
{
- return _queueContext.get();
+ QueueEntry entry = _queueContext.get();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(_logActor + ": lastSeenEntry: " + (entry == null ? "null" : entry.debugIdentity()));
+ }
+
+ return entry;
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
{
+ _logger.debug(debugIdentity() + " Setting Last Seen To:" + (newvalue == null ? "nullNV" : newvalue.debugIdentity()));
return _queueContext.compareAndSet(expected,newvalue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 3c71282c57..450852cef7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -97,6 +97,7 @@ public class LocalTransactionalContext implements TransactionalContext
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+ _queue.checkCapacity(_channel);
if(entry.immediateAndNotDelivered())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 28af36e3db..10d6021d27 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -91,6 +91,8 @@ public class NonTransactionalContext implements TransactionalContext
public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
QueueEntry entry = queue.enqueue(_storeContext, message);
+ queue.checkCapacity(_channel);
+
//following check implements the functionality
//required by the 'immediate' flag:
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index b16a289f0a..a131c2a465 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
@@ -271,6 +272,15 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean getBlockOnQueueFull()
+ {
+ return false;
+ }
+
+ public void setBlockOnQueueFull(boolean block)
+ {
+ }
+
public long getMinimumAlertRepeatGap()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
@@ -285,8 +295,8 @@ public class MockAMQQueue implements AMQQueue
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
+
- @Override
public void checkMessageStatus() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -317,6 +327,10 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void checkCapacity(AMQChannel channel)
+ {
+ }
+
public ManagedObject getManagedObject()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -327,12 +341,31 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
public void setMinimumAlertRepeatGap(long value)
{
}
+ public long getCapacity()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setCapacity(long capacity)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void configure(QueueConfiguration config)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
index e75ed640aa..4c8ff01be0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.registry;
import junit.framework.TestCase;
import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.AMQException;
import java.security.Security;
import java.security.Provider;
@@ -69,7 +68,7 @@ public class ApplicationRegistryShutdownTest extends TestCase
// Register new providers
try
{
- _registry.initialise();
+ _registry.initialise(ApplicationRegistry.DEFAULT_INSTANCE);
}
catch (Exception e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 8fef8baa02..6b8201eefb 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.RootMessageLoggerImpl;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
@@ -50,7 +51,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
super(new ServerConfiguration(new PropertiesConfiguration()));
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
_logger.info("Initialising NullApplicationRegistry");
@@ -92,6 +93,8 @@ public class NullApplicationRegistry extends ApplicationRegistry
@Override
public void close() throws Exception
{
+ CurrentActor.set(new BrokerActor(_rootMessageLogger));
+
try
{
super.close();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 43948c05c4..7b7c86bb80 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.util;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -31,7 +30,6 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
-import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
@@ -45,7 +43,6 @@ import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Properties;
import java.util.Arrays;
@@ -75,7 +72,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
_config = config;
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
_rootMessageLogger = new RootMessageLoggerImpl(_configuration,
new Log4jMessageLogger());
diff --git a/qpid/java/client/src/main/java/log4j.xml b/qpid/java/client/src/main/java/log4j.xml
new file mode 100644
index 0000000000..c27acba818
--- /dev/null
+++ b/qpid/java/client/src/main/java/log4j.xml
@@ -0,0 +1,36 @@
+<!--
+
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+-->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-5p %c{1} - %m%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.apache.qpid">
+ <level value="warn"/>
+ <appender-ref ref="console" />
+ </logger>
+
+</log4j:configuration>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0d2adcec8a..ed122a772e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -313,7 +313,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
protected AMQConnectionDelegate _delegate;
// this connection maximum number of prefetched messages
- protected int _maxPrefetch;
+ private int _maxPrefetch;
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
@@ -450,7 +450,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
- // use the defaul value set for all connections
+ // use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
@@ -512,7 +512,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
boolean retryAllowed = true;
Exception connectionException = null;
- while (!_connected && retryAllowed)
+ while (!_connected && retryAllowed && brokerDetails != null)
{
ProtocolVersion pe = null;
try
@@ -691,12 +691,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public boolean attemptReconnection()
{
- while (_failoverPolicy.failoverAllowed())
+ BrokerDetails broker = null;
+ while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
{
try
{
- makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
-
+ makeBrokerConnection(broker);
return true;
}
catch (Exception e)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index e5980d8b7d..e6c3473cb1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -39,6 +39,15 @@ public interface AMQConnectionDelegate
Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException;
+ /**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException thrown if there is a problem creating the session.
+ */
+ XASession createXASession() throws JMSException;
+
XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
void failoverPrep();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 927929c94a..4d10180667 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -100,6 +100,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
/**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
+ /**
* create an XA Session and start it if required.
*/
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
@@ -285,7 +297,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_qpidConnection.setIdleTimeout(l);
}
- @Override
public int getMaxChannelID()
{
return Integer.MAX_VALUE;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 9876393d4c..97d0d0516e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -191,6 +191,18 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}, _conn).execute();
}
+ /**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException thrown if there is a problem creating the session.
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException, FailoverException
{
@@ -290,7 +302,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
public void setIdleTimeout(long l){}
- @Override
public int getMaxChannelID()
{
return (int) (Math.pow(2, 16)-1);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2e3e417c95..dd9a00ce10 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import java.io.Serializable;
-import java.io.IOException;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -60,6 +59,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -113,7 +113,6 @@ import org.slf4j.LoggerFactory;
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
-
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -198,16 +197,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
- protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+ protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
- protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+ protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
protected final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
@@ -244,10 +259,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
- private int _defaultPrefetchHighMark;
+ private int _prefetchHighMark;
/** Holds the low mark for prefetched messages, below which the session is resumed. */
- private int _defaultPrefetchLowMark;
+ private int _prefetchLowMark;
/** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
@@ -428,13 +443,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
- _defaultPrefetchHighMark = defaultPrefetchHighMark;
- _defaultPrefetchLowMark = defaultPrefetchLowMark;
+ _prefetchHighMark = defaultPrefetchHighMark;
+ _prefetchLowMark = defaultPrefetchLowMark;
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
- new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -442,7 +457,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void aboveThreshold(int currentValue)
{
_logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
+ "Above threshold(" + _prefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
_suspendState.set(true);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -452,7 +467,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void underThreshold(int currentValue)
{
_logger.debug(
- "Below threshold(" + _defaultPrefetchLowMark
+ "Below threshold(" + _prefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
_suspendState.set(false);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -462,7 +477,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+ _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
}
@@ -759,8 +774,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
+ {
+ rollback();
+
+ throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+ "Forced rollback");
+ }
+
- // TGM FIXME: what about failover?
// Acknowledge all delivered messages
while (true)
{
@@ -778,7 +801,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
@@ -870,7 +893,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false,
messageSelector, null, true, true);
}
@@ -878,7 +901,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
false, false);
}
@@ -886,7 +909,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
false, false);
}
@@ -894,7 +917,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -903,7 +926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -912,7 +935,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
messageSelector, null, false, false);
}
@@ -1336,17 +1359,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public int getDefaultPrefetch()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchHigh()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchLow()
{
- return _defaultPrefetchLowMark;
+ return _prefetchLowMark;
}
public AMQShortString getDefaultQueueExchangeName()
@@ -1491,6 +1514,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
sendRecover();
+ markClean();
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1559,6 +1584,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
suspendChannel(true);
}
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+
+ syncDispatchQueue();
+
+ _dispatcher.rollback();
+
releaseForRollback();
sendRollback();
@@ -1851,26 +1884,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void failoverPrep()
{
- startDispatcherIfNecessary();
syncDispatchQueue();
}
void syncDispatchQueue()
{
- final CountDownLatch signal = new CountDownLatch(1);
- _queue.add(new Dispatchable() {
- public void dispatch(AMQSession ssn)
+ if (Thread.currentThread() == _dispatcherThread)
+ {
+ while (!_closed.get() && !_queue.isEmpty())
{
- signal.countDown();
+ Dispatchable disp;
+ try
+ {
+ disp = (Dispatchable) _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // Check just in case _queue becomes empty, it shouldn't but
+ // better than an NPE.
+ if (disp == null)
+ {
+ _logger.debug("_queue became empty during sync.");
+ break;
+ }
+
+ disp.dispatch(AMQSession.this);
}
- });
- try
- {
- signal.await();
}
- catch (InterruptedException e)
+ else
{
- throw new RuntimeException(e);
+ startDispatcherIfNecessary();
+
+ final CountDownLatch signal = new CountDownLatch(1);
+
+ _queue.add(new Dispatchable()
+ {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
@@ -2233,7 +2298,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, false);
+ return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
}
private P createProducerImpl(final Destination destination, final boolean mandatory,
@@ -2704,15 +2769,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
+ _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
}
- public void checkFlowControl() throws InterruptedException
+ public void checkFlowControl() throws InterruptedException, JMSException
{
+ long expiryTime = 0L;
synchronized (_flowControl)
{
- while (!_flowControl.getFlowControl())
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+ : expiryTime) >= System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
+ _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
+ if(!_flowControl.getFlowControl())
{
- _flowControl.wait();
+ _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+ throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 0644bd88a8..1587d6a6bf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void releaseForRollback()
{
- startDispatcherIfNecessary();
- syncDispatchQueue();
- _dispatcher.rollback();
getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
_txRangeSet.clear();
_txSize = 0;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index d7196c0abb..bc1453beaf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -195,6 +195,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public void releaseForRollback()
{
+ // Reject all the messages that have been received in this session and
+ // have not yet been acknowledged. Should look to remove
+ // _deliveredMessageTags and use _txRangeSet as used by 0-10.
+ // Otherwise messages will be able to arrive out of order to a second
+ // consumer on the queue. Whilst this is within the JMS spec it is not
+ // user friendly and avoidable.
while (true)
{
Long tag = _deliveredMessageTags.poll();
@@ -205,11 +211,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
rejectMessage(tag, true);
}
-
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
}
public void rejectMessage(long deliveryTag, boolean requeue)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 2dfecc80ac..667785c441 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -779,6 +779,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
else
{
_session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
}
break;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 5ff6066ddc..44ce59975a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -60,7 +60,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
/**
* Priority of messages created by this producer.
*/
- private int _messagePriority;
+ private int _messagePriority = Message.DEFAULT_PRIORITY;
/**
* Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
index 20fa68605a..43025bd724 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
@@ -47,7 +47,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ
public synchronized XASession createXASession() throws JMSException
{
checkNotClosed();
- return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2);
+ return _delegate.createXASession();
}
//-- Interface XAQueueConnection
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index 3627618e68..be0d283470 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -39,7 +39,7 @@ public class ClientProperties
* type: long
*/
public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
- public static final String MAX_PREFETCH_DEFAULT = "5000";
+ public static final String MAX_PREFETCH_DEFAULT = "500";
/**
* When true a sync command is sent after every persistent messages.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 8223cd5394..7761215450 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.failover;
import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
@@ -134,6 +135,7 @@ public class FailoverHandler implements Runnable
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
+ // Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index 9c791730ca..0e3333940a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance();
private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
@@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- return false;
+ _channelFlowMethodHandler.methodReceived(_session, body, channelId);
+ return true;
}
public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 06a1fe2696..e645f4f214 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -308,7 +308,6 @@ public class AMQProtocolHandler implements ProtocolEngine
*/
public void exception(Throwable cause)
{
- _logger.info("AS: HELLO");
if (_failoverState == FailoverState.NOT_STARTED)
{
// if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 67cda957fb..a3d015eadc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -253,7 +253,7 @@ public abstract class BlockingWaiter<T>
}
else
{
- System.err.println("WARNING: new error arrived while old one not yet processed");
+ System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
}
try
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
index e05a7ab6e2..960661daea 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
@@ -189,7 +189,8 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener
{
synchronized (_brokerListLock)
{
- return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ return _currentBrokerDetail;
}
}
@@ -214,7 +215,15 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener
broker.getHost().equals(_currentBrokerDetail.getHost()) &&
broker.getPort() == _currentBrokerDetail.getPort())
{
- return getNextBrokerDetails();
+ if (_connectionDetails.getBrokerCount() > 1)
+ {
+ return getNextBrokerDetails();
+ }
+ else
+ {
+ _failedAttemps ++;
+ return null;
+ }
}
String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
diff --git a/qpid/java/common/bin/qpid-run b/qpid/java/common/bin/qpid-run
index 0b5070d937..63bb648fd8 100755
--- a/qpid/java/common/bin/qpid-run
+++ b/qpid/java/common/bin/qpid-run
@@ -111,6 +111,7 @@ if [ -n "$QPID_LOG_SUFFIX" ]; then
fi
log $INFO System Properties set to $SYSTEM_PROPS
+log $INFO QPID_OPTS set to $QPID_OPTS
program=$(basename $0)
sourced=${BASH_SOURCE[0]}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java b/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
index 3c1ea22595..7d8a5b7b36 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
@@ -51,7 +51,6 @@ public class ConsoleOutput implements Sender<ByteBuffer>
System.out.println("CLOSED");
}
- @Override
public void setIdleTimeout(long l)
{
// TODO Auto-generated method stub
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
index 98dba9fed1..cb3387c6d3 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
@@ -110,7 +110,7 @@ public interface LoggingManagement
* Reloads the log4j configuration file, applying any changes made.
*
* @throws IOException
- * @since Qpid JMX API 1.3
+ * @since Qpid JMX API 1.4
*/
@MBeanOperation(name = "reloadConfigFile", description = "Reload the log4j xml configuration file", impact = MBeanOperationInfo.ACTION)
void reloadConfigFile() throws IOException;
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
index c158c26857..5819631dba 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
@@ -43,7 +43,7 @@ public interface ServerInformation
* Qpid JMX API 1.1 can be assumed.
*/
int QPID_JMX_API_MAJOR_VERSION = 1;
- int QPID_JMX_API_MINOR_VERSION = 3;
+ int QPID_JMX_API_MINOR_VERSION = 4;
/**
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
index cd6f7ff808..9259d36d79 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
@@ -48,7 +48,7 @@ public abstract class ApplicationRegistry
//max supported broker management interface supported by this release of the management console
public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 1;
- public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 3;
+ public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 4;
public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc";
diff --git a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java
index 1b1d08aa67..536033368f 100644
--- a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java
+++ b/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java
@@ -349,7 +349,7 @@ public class ConfigurationFileTabControl extends TabControl
_logWatchIntervalLabel.setFont(ApplicationRegistry.getFont(FONT_BOLD));
_logWatchIntervalLabel.setLayoutData(new GridData(SWT.LEFT, SWT.CENTER, false, true));
- if(_ApiVersion.greaterThanOrEqualTo(1, 3))
+ if(_ApiVersion.greaterThanOrEqualTo(1, 4))
{
Group reloadConfigFileGroup = new Group(attributesComposite, SWT.SHADOW_NONE);
reloadConfigFileGroup.setBackground(attributesComposite.getBackground());
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 0c32414647..5796af928a 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -261,6 +261,7 @@
<jvmarg value="${jvm.args}"/>
<sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
+ <sysproperty key="amqj.server.logging.level" value="${amqj.server.logging.level}"/>
<sysproperty key="amqj.protocol.logging.level" value="${amqj.protocol.logging.level}"/>
<sysproperty key="log4j.debug" value="${log4j.debug}"/>
<sysproperty key="root.logging.level" value="${root.logging.level}"/>
@@ -269,6 +270,7 @@
<sysproperty key="java.naming.provider.url" value="${java.naming.provider.url}"/>
<sysproperty key="broker" value="${broker}"/>
<sysproperty key="broker.clean" value="${broker.clean}"/>
+ <sysproperty key="broker.clean.between.tests" value="${broker.clean.between.tests}"/>
<sysproperty key="broker.version" value="${broker.version}"/>
<sysproperty key="broker.ready" value="${broker.ready}" />
<sysproperty key="broker.stopped" value="${broker.stopped}" />
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
index 52120019f5..e7975f8d24 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
@@ -78,15 +78,23 @@ public class BrokerStartupTest extends AbstractTestLogging
// Add an invalid value
_broker += " -l invalid";
- // The release-bin build of the broker uses this log4j configuration
- // so set up the broker environment to use it for this test.
- // Also include -Dlog4j.debug so we can validate that it picked up this config
- setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug -Dlog4j.configuration=file:" + System.getProperty(QPID_HOME) + "/../broker/src/main/java/log4j.properties");
+ // The broker has a built in default log4j configuration set up
+ // so if the the broker cannot load the -l value it will use default
+ // use this default. Test that this is correctly loaded, by
+ // including -Dlog4j.debug so we can validate.
+ setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug");
// Disable all client logging so we can test for broker DEBUG only.
- Logger.getRootLogger().setLevel(Level.WARN);
- Logger.getLogger("qpid.protocol").setLevel(Level.WARN);
- Logger.getLogger("org.apache.qpid").setLevel(Level.WARN);
+ setLoggerLevel(Logger.getRootLogger(), Level.WARN);
+ setLoggerLevel(Logger.getLogger("qpid.protocol"), Level.WARN);
+ setLoggerLevel(Logger.getLogger("org.apache.qpid"), Level.WARN);
+
+ // Set the broker to use info level logging, which is the qpid-server
+ // default. Rather than debug which is the test default.
+ setBrokerOnlySystemProperty("amqj.server.logging.level", "info");
+ // Set the logging defaults to info for this test.
+ setBrokerOnlySystemProperty("amqj.logging.level", "info");
+ setBrokerOnlySystemProperty("root.logging.level", "info");
startBroker();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
new file mode 100644
index 0000000000..f1a1c1a9a8
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
@@ -0,0 +1,330 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.failover;
+
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.AMQConnectionClosedException;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test case based on user reported error.
+ *
+ * Summary:
+ * A user has reported message loss from their application. On bouncing of
+ * the broker the 'lost' messages are delivered to the broker.
+ *
+ * Note:
+ * The client was using Spring so that may influence the situation.
+ *
+ * Issue:
+ * The log files show 7 instances of the following which result in 7
+ * missing messages.
+ *
+ * The client log files show:
+ *
+ * The broker log file show:
+ *
+ *
+ * 7 missing messages have delivery tags 5-11. Which says that they are
+ * sequentially the next message from the broker.
+ *
+ * The only way for the 'without a handler' log to occur is if the consumer
+ * has been removed from the look up table of the dispatcher.
+ * And the only way for the 'null message' log to occur on the broker is is
+ * if the message does not exist in the unacked-map
+ *
+ * The consumer is only removed from the list during session
+ * closure and failover.
+ *
+ * If the session was closed then the broker would requeue the unacked
+ * messages so the potential exists to have an empty map but the broker
+ * will not send a message out after the unacked map has been cleared.
+ *
+ * When failover occurs the _consumer map is cleared and the consumers are
+ * resubscribed. This is down without first stopping any existing
+ * dispatcher so there exists the potential to receive a message after
+ * the _consumer map has been cleared which is how the 'without a handler'
+ * log statement occurs.
+ *
+ * Scenario:
+ *
+ * Looking over logs the sequence that best fits the events is as follows:
+ * - Something causes Mina to be delayed causing the WriteTimoutException.
+ * - This exception is recevied by AMQProtocolHandler#exceptionCaught
+ * - As the WriteTimeoutException is an IOException this will cause
+ * sessionClosed to be called to start failover.
+ * + This is potentially the issues here. All IOExceptions are treated
+ * as connection failure events.
+ * - Failover Runs
+ * + Failover assumes that the previous connection has been closed.
+ * + Failover binds the existing objects (AMQConnection/Session) to the
+ * new connection objects.
+ * - Everything is reported as being successfully failed over.
+ * However, what is neglected is that the original connection has not
+ * been closed.
+ * + So what occurs is that the broker sends a message to the consumer on
+ * the original connection, as it was not notified of the client
+ * failing over.
+ * As the client failover reuses the original AMQSession and Dispatcher
+ * the new messages the broker sends to the old consumer arrives at the
+ * client and is processed by the same AMQSession and Dispatcher.
+ * However, as the failover process cleared the _consumer map and
+ * resubscribe the consumers the Dispatcher does not recognise the
+ * delivery tag and so logs the 'without a handler' message.
+ * - The Dispatcher then attempts to reject the message, however,
+ * + The AMQSession/Dispatcher pair have been swapped to using a new Mina
+ * ProtocolSession as part of the failover process so the reject is
+ * sent down the second connection. The broker receives the Reject
+ * request but as the Message was sent on a different connection the
+ * unacknowledgemap is empty and a 'message is null' log message
+ * produced.
+ *
+ * Test Strategy:
+ *
+ * It should be easy to demonstrate if we can send an IOException to
+ * AMQProtocolHandler#exceptionCaught and then try sending a message.
+ *
+ * The current unknowns here are the type of consumers that are in use.
+ * If it was an exclusive queue(Durable Subscription) then why did the
+ * resubscribe not fail.
+ *
+ * If it was not exclusive then why did the messages not round robin?
+ */
+public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener
+{
+ private CountDownLatch _failoverOccured = new CountDownLatch(1);
+ AMQConnection _connection;
+ Session _session;
+ Queue _queue;
+ MessageConsumer _consumer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ stopBroker(getFailingPort());
+
+ }
+
+ /**
+ * Test Summary:
+ *
+ * Create a queue consumer and send 10 messages to the broker.
+ *
+ * Consume the first message.
+ * This will pull the rest into the prefetch
+ *
+ * Send an IOException to the MinaProtocolHandler.
+ *
+ * This will force failover to occur.
+ *
+ * 9 messages would normally be expected but it is expected that none will
+ * arrive. As they are still in the prefetch of the first session.
+ *
+ * To free the messages we need to close all connections.
+ * - Simply doing connection.close() and retesting will not be enough as
+ * the original connection's IO layer will still exist and is nolonger
+ * connected to the connection object as a result of failover.
+ *
+ * - Test will need to retain a reference to the original connection IO so
+ * that it can be closed releasing the messages to validate that the
+ * messages have indeed been 'lost' on that sesssion.
+ */
+ public void test() throws Exception
+ {
+ initialiseConnection();
+
+ // Create Producer
+ // Send 10 messages
+ List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10);
+
+ // Consume first messasge
+ Message received = _consumer.receive(2000);
+
+ // Verify received messages
+ assertNotNull("First message not received.", received);
+ assertEquals("Incorrect message Received",
+ messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+
+ // Allow ack to be sent to broker, by performing a synchronous command
+ // along the session.
+// _session.createConsumer(_session.createTemporaryQueue()).close();
+
+ //Retain IO Layer
+ AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();
+
+ // Send IO Exception - causing failover
+ _connection.getProtocolHandler().
+ exception(new WriteTimeoutException("WriteTimeoutException to cause failover."));
+
+ // Verify Failover occured through ConnectionListener
+ assertTrue("Failover did not occur",
+ _failoverOccured.await(4000, TimeUnit.MILLISECONDS));
+
+ //Verify new protocolSession is not the same as the original
+ assertNotSame("Protocol Session has not changed",
+ protocolSession,
+ _connection.getProtocolHandler().getProtocolSession());
+
+ /***********************************/
+ // This verifies that the bug has been resolved
+
+ // Attempt to consume again. Expect 9 messages
+ for (int count = 1; count < 10; count++)
+ {
+ received = _consumer.receive(2000);
+ assertNotNull("Expected message not received:" + count, received);
+ assertEquals(messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+ }
+
+ //Verify there are no more messages
+ received = _consumer.receive(1000);
+ assertNull("Message receieved when there should be none:" + received,
+ received);
+
+// /***********************************/
+// // This verifies that the bug exists
+//
+// // Attempt to consume remaining 9 messages.. Expecting NONE.
+// // receiving just one message should fail so no need to fail 9 times
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when it should be null:" + received, received);
+//
+//// //Close the Connection which you would assume would free the messages
+//// _connection.close();
+////
+//// // Reconnect
+//// initialiseConnection();
+////
+//// // We should still be unable to receive messages
+//// received = _consumer.receive(1000);
+//// assertNull("Message receieved when it should be null:" + received, received);
+////
+//// _connection.close();
+//
+// // Close original IO layer. Expecting messages to be released
+// protocolSession.closeProtocolSession();
+//
+// // Reconnect and all should be good.
+//// initialiseConnection();
+//
+// // Attempt to consume again. Expect 9 messages
+// for (int count = 1; count < 10; count++)
+// {
+// received = _consumer.receive(2000);
+// assertNotNull("Expected message not received:" + count, received);
+// assertEquals(messages.remove(0).getIntProperty("count"),
+// received.getIntProperty("count"));
+// }
+//
+// //Verify there are no more messages
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when there should be none:" + received,
+// received);
+ }
+
+ private void initialiseConnection()
+ throws Exception
+ {
+ //Create Connection
+ _connection = (AMQConnection) getConnection();
+ _connection.setConnectionListener(this);
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _queue = _session.createQueue(getTestQueueName());
+
+ // Create Consumer
+ _consumer = _session.createConsumer(_queue);
+
+ //Start connection
+ _connection.start();
+ }
+
+ /** QpidTestCase back port to this release */
+
+ // modified from QTC as sendMessage is not testable.
+ // - should be renamed sendBlankBytesMessage
+ // - should be renamed sendNumberedBytesMessage
+ public List<Message> sendNumberedBytesMessage(Session session, Destination destination,
+ int count) throws Exception
+ {
+ List<Message> messages = new ArrayList<Message>(count);
+
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++)
+ {
+ Message next = session.createMessage();
+
+ next.setIntProperty("count", i);
+
+ producer.send(next);
+
+ messages.add(next);
+ }
+
+ producer.close();
+ return messages;
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverOccured.countDown();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
index 4f50aba61d..b00a71315e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
@@ -160,7 +160,7 @@ public class BrokerLoggingTest extends AbstractTestLogging
// Set the broker.ready string to check for the _log4j default that
// is still present on standard out.
- System.setProperty(BROKER_READY, "Qpid Broker Ready");
+ setTestClientSystemProperty(BROKER_READY, "Qpid Broker Ready");
startBroker();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
index dfb5cde247..83f0f87bc5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
@@ -63,7 +63,6 @@ import java.util.concurrent.TimeUnit;
*/
public class DeepQueueConsumeWithSelector extends QpidTestCase implements MessageListener
{
- private static final String INDEX = "index";
private static final int MESSAGE_COUNT = 10000;
private static final int BATCH_SIZE = MESSAGE_COUNT / 10;
@@ -129,9 +128,7 @@ public class DeepQueueConsumeWithSelector extends QpidTestCase implements Messag
@Override
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
- Message message = session.createTextMessage("Message :" + msgCount);
-
- message.setIntProperty(INDEX, msgCount);
+ Message message = super.createNextMessage(session,msgCount);
if ((msgCount % BATCH_SIZE) == 0 )
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
new file mode 100644
index 0000000000..f220760511
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -0,0 +1,393 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.IOException;
+
+public class ProducerFlowControlTest extends AbstractTestLogging
+{
+ private static final int TIMEOUT = 1500;
+
+
+ private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
+
+ protected final String QUEUE = "ProducerFlowControl";
+
+ private static final int MSG_COUNT = 50;
+
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+ private Session consumerSession;
+
+
+ private MessageConsumer consumer;
+ private final AtomicInteger _sentMessages = new AtomicInteger();
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _monitor.reset();
+
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ producerConnection.start();
+
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ producerConnection.close();
+ consumerConnection.close();
+ super.tearDown();
+ }
+
+ public void testCapacityExceededCausesBlock()
+ throws JMSException, NamingException, AMQException, InterruptedException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get());
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message not sent after two messages received", 5, _sentMessages.get());
+
+ }
+
+ public void testBrokerLogMessages()
+ throws JMSException, NamingException, AMQException, InterruptedException, IOException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+ List<String> results = _monitor.findMatches("QUE-1003");
+
+ assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+
+ while(consumer.receive(1000) != null);
+
+ results = _monitor.findMatches("QUE-1004");
+
+ assertEquals("Did not find correct number of QUE_1004 queue underfull messages", 1, results.size());
+
+
+
+ }
+
+
+ public void testClientLogMessages()
+ throws JMSException, NamingException, AMQException, InterruptedException, IOException
+ {
+ setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
+ setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
+
+ Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ producer = session.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(10000);
+ List<String> results = _monitor.findMatches("Message send delayed by");
+ assertEquals("Incorrect number of delay messages logged by client",3,results.size());
+ results = _monitor.findMatches("Message send failed due to timeout waiting on broker enforced flow control");
+ assertEquals("Incorrect number of send failure messages logged by client",1,results.size());
+
+
+
+ }
+
+
+ public void testFlowControlOnCapacityResumeEqual()
+ throws JMSException, NamingException, AMQException, InterruptedException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",1000);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ _sentMessages.set(0);
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get());
+
+
+ }
+
+
+ public void testFlowControlSoak()
+ throws Exception, NamingException, AMQException, InterruptedException
+ {
+ _sentMessages.set(0);
+ final int numProducers = 10;
+ final int numMessages = 100;
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",6000);
+ arguments.put("x-qpid-flow-resume-capacity",3000);
+
+ ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
+
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
+ consumerConnection.start();
+
+ Connection[] producers = new Connection[numProducers];
+ for(int i = 0 ; i < numProducers; i ++)
+ {
+
+ producers[i] = getConnection();
+ producers[i].start();
+ Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer myproducer = session.createProducer(queue);
+ MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L);
+ }
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ for(int j = 0; j < numProducers * numMessages; j++)
+ {
+
+ Message msg = consumer.receive(5000);
+ Thread.sleep(50L);
+ assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg);
+
+ }
+
+
+
+ Message msg = consumer.receive(500);
+ assertNull("extra message received", msg);
+
+
+ for(int i = 0; i < numProducers; i++)
+ {
+ producers[i].close();
+ }
+
+ }
+
+
+
+ public void testSendTimeout()
+ throws JMSException, NamingException, AMQException, InterruptedException
+ {
+ setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
+ Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ producer = session.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(10000);
+
+ Exception e = sender.getException();
+
+ assertNotNull("No timeout exception on sending", e);
+
+ }
+
+ private MessageSender sendMessagesAsync(final MessageProducer producer,
+ final Session producerSession,
+ final int numMessages,
+ long sleepPeriod)
+ {
+ MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod);
+ new Thread(sender).start();
+ return sender;
+ }
+
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ throws JMSException
+ {
+
+ for (int msg = 0; msg < numMessages; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ _sentMessages.incrementAndGet();
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+
+ private static final byte[] BYTE_300 = new byte[300];
+
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_300);
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+
+
+ private class MessageSender implements Runnable
+ {
+ private final MessageProducer _producer;
+ private final Session _producerSession;
+ private final int _numMessages;
+
+
+
+ private JMSException _exception;
+ private long _sleepPeriod;
+
+ public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ {
+ _producer = producer;
+ _producerSession = producerSession;
+ _numMessages = numMessages;
+ _sleepPeriod = sleepPeriod;
+ }
+
+ public void run()
+ {
+ try
+ {
+ sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod);
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
+ }
+ }
+
+ public JMSException getException()
+ {
+ return _exception;
+ }
+ }
+} \ No newline at end of file
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index bb7b5efc75..3e5470d5cb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -21,27 +21,34 @@
package org.apache.qpid.server.security.acl;
-
-import junit.framework.TestCase;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.*;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.naming.NamingException;
-
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -54,6 +61,14 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void setUp() throws Exception
{
+ //Performing setUp here would result in a broker with the default ACL test config
+
+ //Each test now calls the private setUpACLTest to allow them to make
+ //individual customisations to the base ACL settings
+ }
+
+ private void setUpACLTest() throws Exception
+ {
final String QPID_HOME = System.getProperty("QPID_HOME");
if (QPID_HOME == null)
@@ -73,8 +88,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
return "amqp://" + username + ":" + password + "@clientid/test?brokerlist='" + getBroker() + "?retries='0''";
}
- public void testAccessAuthorized() throws AMQException, URLSyntaxException
+ public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -96,6 +113,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testAccessNoRights() throws Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("guest", "guest");
@@ -120,8 +139,40 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
}
}
- public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException
+ public void testGuestConsumeWithCreateRightsAndWithoutConsumeRights() throws NamingException, ConfigurationException, IOException, Exception
+ {
+ //Customise the ACL config to give the guest user some create (could be any, non-consume) rights to
+ //force creation of a PrincipalPermissions instance to perform the consume rights check against.
+ setConfigurationProperty("virtualhosts.virtualhost.test.security.access_control_list.create.queues.queue.users.user", "guest");
+
+ setUpACLTest();
+
+ try
+ {
+ Connection conn = getConnection("guest", "guest");
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sesh.createConsumer(sesh.createQueue("example.RequestQueue"));
+
+ conn.close();
+ }
+ catch (JMSException e)
+ {
+ Throwable cause = e.getLinkedException();
+
+ assertNotNull("There was no liked exception", cause);
+ assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
+ assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ }
+ }
+
+ public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -142,6 +193,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -167,8 +220,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
}
}
- public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException
+ public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -191,6 +246,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -212,8 +269,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
}
}
- public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException
+ public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -239,8 +298,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
}
}
- public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException
+ public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -271,6 +332,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -311,8 +374,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
}
- public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
+ public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -333,6 +398,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("client", "guest");
@@ -358,6 +425,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -391,8 +460,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
return (Connection) connection;
}
- public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException
+ public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -414,6 +485,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -436,6 +509,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
@@ -461,6 +536,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception
{
+ setUpACLTest();
+
Connection connection = null;
try
{
@@ -492,6 +569,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
*/
public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
//Set up the Server
Connection serverConnection = getConnection("server", "guest");
@@ -572,6 +651,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
+ setUpACLTest();
+
try
{
Connection conn = getConnection("server", "guest");
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 62b54d3086..f9cf48a2b1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -61,7 +61,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
setupSession();
- _queue = _clientSession.createQueue(getName()+System.currentTimeMillis());
+ _queue = _clientSession.createQueue(getTestQueueName());
_clientSession.createConsumer(_queue).close();
//Ensure there are no messages on the queue to start with.
@@ -497,7 +497,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
if (msgCount == failPoint)
{
- failBroker();
+ failBroker(getFailingPort());
}
}
@@ -529,7 +529,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
sendMessages("connection2", messages);
}
- failBroker();
+ failBroker(getFailingPort());
checkQueueDepth(messages);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
index 39e2b892a9..ff766c907d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
@@ -22,64 +22,172 @@ package org.apache.qpid.test.client;
import org.apache.qpid.test.utils.*;
import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.ComparisonFailure;
+import junit.framework.AssertionFailedError;
/**
- * RollbackOrderTest
+ * RollbackOrderTest, QPID-1864, QPID-1871
+ *
+ * Description:
+ *
+ * The problem that this test is exposing is that the dispatcher used to be capable
+ * of holding on to a message when stopped. This ment that when the rollback was
+ * called and the dispatcher stopped it may have hold of a message. So after all
+ * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
+ * have been cleared the client still had a single message, the one the
+ * dispatcher was holding on to.
+ *
+ * As a result the TxRollback operation would run and then release the dispatcher.
+ * Whilst the dispatcher would then proceed to reject the message it was holiding
+ * the Broker would already have resent that message so the rejection would silently
+ * fail.
+ *
+ * And the client would receieve that single message 'early', depending on the
+ * number of messages already recevied when rollback was called.
+ *
+ *
+ * Aims:
+ *
+ * The tests puts 50 messages on to the queue.
+ *
+ * The test then tries to cause the dispatcher to stop whilst it is in the process
+ * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
+ *
+ * To exercise this path we have 50 message flowing to the client to give the
+ * dispatcher a bit of work to do moving messages.
+ *
+ * Then we loop - 10 times
+ * - Validating that the first message received is always message 1.
+ * - Receive a few more so that there are a few messages to reject.
+ * - call rollback, to try and catch the dispatcher mid process.
+ *
+ * Outcome:
+ *
+ * The hope is that we catch the dispatcher mid process and cause a BasicReject
+ * to fail. Which will be indicated in the log but will also cause that failed
+ * rejected message to be the next to be delivered which will not be message 1
+ * as expected.
+ *
+ * We are testing a race condition here but we can check through the log file if
+ * the race condition occured. However, performing that check will only validate
+ * the problem exists and will not be suitable as part of a system test.
*
*/
-
public class RollbackOrderTest extends QpidTestCase
{
- private Connection conn;
- private Queue queue;
- private Session ssn;
- private MessageProducer prod;
- private MessageConsumer cons;
+ private Connection _connection;
+ private Queue _queue;
+ private Session _session;
+ private MessageConsumer _consumer;
@Override public void setUp() throws Exception
{
super.setUp();
- conn = getConnection();
- conn.start();
- ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- queue = ssn.createQueue("rollback-order-test-queue");
- prod = ssn.createProducer(queue);
- cons = ssn.createConsumer(queue);
- for (int i = 0; i < 5; i++)
- {
- TextMessage msg = ssn.createTextMessage("message " + (i+1));
- prod.send(msg);
- }
- ssn.commit();
+ _connection = getConnection();
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue(getTestQueueName());
+ _consumer = _session.createConsumer(_queue);
+
+ //Send more messages so it is more likely that the dispatcher is
+ // processing on rollback.
+ sendMessage(_session, _queue, 50);
+ _session.commit();
+
}
public void testOrderingAfterRollback() throws Exception
{
- for (int i = 0; i < 10; i++)
+ //Start the session now so we
+ _connection.start();
+
+ for (int i = 0; i < 20; i++)
{
- TextMessage msg = (TextMessage) cons.receive();
- assertEquals("message 1", msg.getText());
- ssn.rollback();
+ Message msg = _consumer.receive();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ // Pull additional messages through so we have some reject work to do
+ for (int m=0; m < 5 ; m++)
+ {
+ _consumer.receive();
+ }
+
+ System.err.println("ROT-Rollback");
+ _logger.warn("ROT-Rollback");
+ _session.rollback();
}
}
- @Override public void tearDown() throws Exception
+ public void testOrderingAfterRollbackOnMessage() throws Exception
{
- while (true)
+ final CountDownLatch count= new CountDownLatch(20);
+ final Exception exceptions[] = new Exception[20];
+ final AtomicBoolean failed = new AtomicBoolean(false);
+
+ _consumer.setMessageListener(new MessageListener()
{
- Message msg = cons.receiveNoWait();
- if (msg == null)
+
+ public void onMessage(Message message)
{
- break;
+
+ Message msg = message;
+ try
+ {
+ count.countDown();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ _session.rollback();
+ }
+ catch (JMSException e)
+ {
+ System.out.println("Error:" + e.getMessage());
+ exceptions[(int)count.getCount()] = e;
+ }
+ catch (AssertionFailedError cf)
+ {
+ // End Test if Equality test fails
+ while (count.getCount() != 0)
+ {
+ count.countDown();
+ }
+
+ System.out.println("Error:" + cf.getMessage());
+ System.err.println(cf.getMessage());
+ cf.printStackTrace();
+ failed.set(true);
+ }
}
- else
+ });
+ //Start the session now so we
+ _connection.start();
+
+ count.await();
+
+ for (Exception e : exceptions)
+ {
+ if (e != null)
{
- msg.acknowledge();
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ failed.set(true);
}
}
- ssn.commit();
+
+// _consumer.close();
+ _connection.close();
+
+ assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
+ }
+
+ @Override public void tearDown() throws Exception
+ {
+
+ drainQueue(_queue);
+
super.tearDown();
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index dfc3bb7b42..c307176f3f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -37,7 +37,6 @@ import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
@@ -58,13 +57,12 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
private Session consumerSession;
private MessageConsumer consumer;
- private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
- private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
private int seed;
private Random rand;
-
+ private int _currentPort = getFailingPort();
+
@Override
protected void setUp() throws Exception
{
@@ -227,7 +225,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.info("Failing over");
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
// Check that you produce and consume the rest of messages.
_logger.debug("==================");
@@ -242,10 +240,10 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.debug("==================");
}
- private void causeFailure(long delay)
+ private void causeFailure(int port, long delay)
{
- failBroker();
+ failBroker(port);
_logger.info("Awaiting Failover completion");
try
@@ -268,7 +266,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -314,7 +312,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
//Fail the first broker
- causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
//Reconnection should occur
assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
@@ -344,15 +342,15 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.debug("===================================================================");
runP2PFailover(numMessages, false,false, false);
- startBroker(getFailingPort());
+ startBroker(_currentPort);
if (useAltPort)
{
- setFailingPort(altPort);
+ _currentPort = altPort;
useAltPort = false;
}
else
{
- setFailingPort(stdPort);
+ _currentPort = stdPort;
useAltPort = true;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
index 5a5e23baa5..a09589b121 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -1,31 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.test.client.message;
-import javax.jms.Connection;
-import javax.jms.Destination;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Assert;
-import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class SelectorTest extends QpidTestCase
+public class SelectorTest extends QpidTestCase implements MessageListener
{
- private static final Logger _logger = Logger.getLogger(SelectorTest.class);
+ private static final Logger _logger = LoggerFactory.getLogger(SelectorTest.class);
- public void testSelectorWithJMSMessageID() throws Exception
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private int count;
+ public String _connectionString = "vm://:1";
+ private static final String INVALID_SELECTOR = "Cost LIKE 5";
+ CountDownLatch _responseLatch = new CountDownLatch(1);
+
+ private static final String BAD_MATHS_SELECTOR = " 1 % 5";
+
+ private static final long RECIEVE_TIMEOUT = 1000;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+
+ private void init(AMQConnection connection) throws JMSException
+ {
+ init(connection, new AMQQueue(connection, getTestQueueName(), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws JMSException
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+ }
+
+ public void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ _responseLatch.countDown();
+ }
+
+ public void testUsingOnMessage() throws Exception
+ {
+ String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
+ // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+
+ Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ // _session.createConsumer(destination).setMessageListener(this);
+ session.createConsumer(_destination, selector).setMessageListener(this);
+
+ try
+ {
+ Message msg = session.createTextMessage("Message");
+ msg.setJMSPriority(1);
+ msg.setIntProperty("Cost", 2);
+ msg.setStringProperty("property-with-hyphen", "wibble");
+ msg.setJMSType("Special");
+
+ _logger.info("Sending Message:" + msg);
+
+ ((BasicMessageProducer) session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ _logger.info("Message sent, waiting for response...");
+
+ _responseLatch.await();
+
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+
+ if (count == 0)
+ {
+ fail("Did not get message!");
+ // throw new RuntimeException("Did not get message!");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ System.out.println("SUCCESS!!");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ }
+
+ }
+
+ public void testUnparsableSelectors() throws Exception
{
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ AMQSession session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ boolean caught = false;
- Destination dest = session.createQueue("SelectorQueue");
+ //Try Creating a Browser
+ try
+ {
+ session.createBrowser(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Consumer
+ try
+ {
+ session.createConsumer(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Receiever
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), BAD_MATHS_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
- MessageProducer prod = session.createProducer(dest);
- MessageConsumer consumer = session.createConsumer(dest,"JMSMessageID IS NOT NULL");
+ }
+
+ public void testRuntimeSelectorError() throws JMSException
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1");
+ MessageProducer producer = session.createProducer(_destination);
+ Message sentMsg = session.createTextMessage();
+
+ sentMsg.setIntProperty("testproperty", 1); // 1 % 5
+ producer.send(sentMsg);
+ Message recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNotNull(recvd);
+
+ sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense
+ producer.send(sentMsg);
+ try
+ {
+ recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNull(recvd);
+ }
+ catch (Exception e)
+ {
+
+ }
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ public void testSelectorWithJMSMessageID() throws Exception
+ {
+ Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = session.createProducer(_destination);
+ MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL");
for (int i=0; i<2; i++)
{
@@ -54,7 +271,7 @@ public class SelectorTest extends QpidTestCase
Message msg3 = consumer.receive(1000);
Assert.assertNull("Msg3 should be null", msg3);
session.commit();
- consumer = session.createConsumer(dest,"JMSMessageID IS NULL");
+ consumer = session.createConsumer(_destination,"JMSMessageID IS NULL");
Message msg4 = consumer.receive(1000);
Message msg5 = consumer.receive(1000);
@@ -62,4 +279,32 @@ public class SelectorTest extends QpidTestCase
Assert.assertNotNull("Msg4 should not be null", msg4);
Assert.assertNotNull("Msg5 should not be null", msg5);
}
+
+ public static void main(String[] argv) throws Exception
+ {
+ SelectorTest test = new SelectorTest();
+ test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+
+ try
+ {
+ while (true)
+ {
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.setUp();
+ }
+ test.testUsingOnMessage();
+
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.tearDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
new file mode 100644
index 0000000000..4b45a96c20
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+public class Acknowledge2ConsumersTest extends FailoverBaseCase
+{
+ protected static int NUM_MESSAGES = 100;
+ protected Connection _con;
+ protected Queue _queue;
+ private Session _producerSession;
+ private Session _consumerSession;
+ private MessageConsumer _consumerA;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queue = (Queue) getInitialContext().lookup("queue");
+
+ //Create Producer put some messages on the queue
+ _con = getConnection();
+ }
+
+ private void init(boolean transacted, int mode) throws JMSException
+ {
+ _producerSession = _con.createSession(true, Session.SESSION_TRANSACTED);
+ _consumerSession = _con.createSession(transacted, mode);
+ _consumerA = _consumerSession.createConsumer(_queue);
+ _con.start();
+ }
+
+ /**
+ * Produces Messages that
+ *
+ * @param transacted
+ * @param mode
+ *
+ * @throws Exception
+ */
+ private void test2ConsumersAcking(boolean transacted, int mode) throws Exception
+ {
+ init(transacted, mode);
+
+ // These should all end up being prefetched by sessionA
+ sendMessage(_producerSession, _queue, NUM_MESSAGES / 2);
+
+ //Create a second consumer (consumerB) to consume some of the messages
+ MessageConsumer consumerB = _consumerSession.createConsumer(_queue);
+
+ // These messages should be roundrobined between A and B
+ sendMessage(_producerSession, _queue, NUM_MESSAGES / 2);
+
+ int count = 0;
+ //Use consumerB to receive messages it has
+ Message msg = consumerB.receive(1500);
+ while (msg != null)
+ {
+ if (mode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ msg.acknowledge();
+ }
+ count++;
+ msg = consumerB.receive(1500);
+ }
+ if (transacted)
+ {
+ _consumerSession.commit();
+ }
+
+ // Close the consumers
+ _consumerA.close();
+ consumerB.close();
+
+ // and close the session to release any prefetched messages.
+ _consumerSession.close();
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count,
+ ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
+
+ // Clean up messages that may be left on the queue
+ _consumerSession = _con.createSession(transacted, mode);
+ _consumerA = _consumerSession.createConsumer(_queue);
+ msg = _consumerA.receive(1500);
+ while (msg != null)
+ {
+ if (mode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ msg.acknowledge();
+ }
+ msg = _consumerA.receive(1500);
+ }
+ _consumerA.close();
+ if (transacted)
+ {
+ _consumerSession.commit();
+ }
+ _consumerSession.close();
+ }
+
+ public void test2ConsumersAutoAck() throws Exception
+ {
+ test2ConsumersAcking(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersClientAck() throws Exception
+ {
+ test2ConsumersAcking(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersTx() throws Exception
+ {
+ test2ConsumersAcking(true, Session.SESSION_TRANSACTED);
+ }
+
+
+
+//
+// /**
+// * Check that session level acknowledge does correctly ack all previous
+// * values. Send 3 messages(0,1,2) then ack 1 and 2. If session ack is
+// * working correctly then acking 1 will also ack 0. Acking 2 will not
+// * attempt to re-ack 0 and 1.
+// *
+// * @throws Exception
+// */
+// public void testSessionAck() throws Exception
+// {
+// init(false, Session.CLIENT_ACKNOWLEDGE);
+//
+// sendMessage(_producerSession, _queue, 3);
+// Message msg;
+//
+// // Drop msg 0
+// _consumerA.receive(RECEIVE_TIMEOUT);
+//
+// // Take msg 1
+// msg = _consumerA.receive(RECEIVE_TIMEOUT);
+//
+// assertNotNull("Message 1 not correctly received.", msg);
+// assertEquals("Incorrect message received", 1, msg.getIntProperty(INDEX));
+//
+// // This should also ack msg 0
+// msg.acknowledge();
+//
+// // Take msg 2
+// msg = _consumerA.receive(RECEIVE_TIMEOUT);
+//
+// assertNotNull("Message 2 not correctly received.", msg);
+// assertEquals("Incorrect message received", 2, msg.getIntProperty(INDEX));
+//
+// // This should just ack msg 2
+// msg.acknowledge();
+//
+// _consumerA.close();
+// _consumerSession.close();
+//
+// assertEquals("Queue not empty.", 0,
+// ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
+// _con.close();
+//
+//
+// }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
new file mode 100644
index 0000000000..f22a405fc3
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
@@ -0,0 +1,356 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener
+{
+
+ protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+ private MessageListener _listener = null;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ NUM_MESSAGES = 10;
+ }
+
+ /**
+ * Override default init to add connectionListener so we can verify that
+ * failover took place
+ *
+ * @param transacted create a transacted session for this test
+ * @param mode if not transacted what ack mode to use for this test
+ *
+ * @throws Exception if a problem occured during test setup.
+ */
+ @Override
+ public void init(boolean transacted, int mode) throws Exception
+ {
+ super.init(transacted, mode);
+ ((AMQConnection) _connection).setConnectionListener(this);
+ // Override the listener for the dirtyAck testing.
+ if (_listener != null)
+ {
+ _consumer.setMessageListener(_listener);
+ }
+ }
+
+ protected void prepBroker(int count) throws Exception
+ {
+ //Stop the connection whilst we repopulate the broker, or the no_ack
+ // test will drain the msgs before we can check we put the right number
+ // back on again.
+// _connection.stop();
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
+
+ if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
+ {
+ assertEquals("Wrong number of messages on queue", count,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+ }
+
+ connection.close();
+
+// _connection.start();
+ }
+
+ @Override
+ public void doAcknowlegement(Message msg) throws JMSException
+ {
+ //Acknowledge current message
+ super.doAcknowlegement(msg);
+
+ int msgCount = msg.getIntProperty(INDEX);
+
+ if (msgCount % 2 == 0)
+ {
+ failBroker(getFailingPort());
+ }
+ else
+ {
+ failBroker(getPort());
+ }
+
+ try
+ {
+ prepBroker(NUM_MESSAGES - msgCount - 1);
+ }
+ catch (Exception e)
+ {
+ fail("Unable to prep new broker," + e.getMessage());
+ }
+
+ try
+ {
+
+ if (msgCount % 2 == 0)
+ {
+ startBroker(getFailingPort());
+ }
+ else
+ {
+ startBroker(getPort());
+ }
+ }
+ catch (Exception e)
+ {
+ fail("Unable to start failover broker," + e.getMessage());
+ }
+
+ }
+
+ int msgCount = 0;
+ boolean cleaned = false;
+
+ class DirtyAckingHandler implements MessageListener
+ {
+ /**
+ * Validate first message but do nothing with it.
+ *
+ * Failover
+ *
+ * The receive the message again
+ *
+ * @param message
+ */
+ public void onMessage(Message message)
+ {
+ // Stop processing if we have an error and had to stop running.
+ if (_receviedAll.getCount() == 0)
+ {
+ _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message);
+ return;
+ }
+
+ try
+ {
+ // Check we have the next message as expected
+ assertNotNull("Message " + msgCount + " not correctly received.", message);
+ assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX));
+
+ if (msgCount == 0 && _failoverCompleted.getCount() != 0)
+ {
+ // This is the first message we've received so lets fail the broker
+
+ failBroker(getFailingPort());
+
+ repopulateBroker();
+
+ _logger.error("Received first msg so failing over");
+
+ return;
+ }
+
+ msgCount++;
+
+ // Don't acknowlege the first message after failover so we can commit
+ // them together
+ if (msgCount == 1)
+ {
+ _logger.error("Received first msg after failover ignoring:" + msgCount);
+
+ // Acknowledge the first message if we are now on the cleaned pass
+ if (cleaned)
+ {
+ _receviedAll.countDown();
+ }
+
+ return;
+ }
+
+ if (_consumerSession.getTransacted())
+ {
+ try
+ {
+ _consumerSession.commit();
+ if (!cleaned)
+ {
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ //expected path
+ }
+ }
+ else
+ {
+ try
+ {
+ message.acknowledge();
+ if (!cleaned)
+ {
+ fail("Session is dirty we should get an IllegalStateException");
+ }
+ }
+ catch (javax.jms.IllegalStateException ise)
+ {
+ assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
+ // Recover the sesion and try again.
+ _consumerSession.recover();
+ }
+ }
+
+ // Acknowledge the last message if we are in a clean state
+ // this will then trigger test teardown.
+ if (cleaned)
+ {
+ _receviedAll.countDown();
+ }
+
+ //Reset message count so we can try again.
+ msgCount = 0;
+ cleaned = true;
+ }
+ catch (Exception e)
+ {
+ // If something goes wrong stop and notifiy main thread.
+ fail(e);
+ }
+ }
+ }
+
+ /**
+ * Test that Acking/Committing a message received before failover causes
+ * an exception at commit/ack time.
+ *
+ * Expected behaviour is that in:
+ * * tx mode commit() throws a transacted RolledBackException
+ * * client ack mode throws an IllegalStateException
+ *
+ * @param transacted is this session trasacted
+ * @param mode What ack mode should be used if not trasacted
+ *
+ * @throws Exception if something goes wrong.
+ */
+ protected void testDirtyAcking(boolean transacted, int mode) throws Exception
+ {
+ NUM_MESSAGES = 2;
+ _listener = new DirtyAckingHandler();
+
+ super.testAcking(transacted, mode);
+ }
+
+ public void testDirtyClientAck() throws Exception
+ {
+ testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testDirtyAckingTransacted() throws Exception
+ {
+ testDirtyAcking(true, Session.SESSION_TRANSACTED);
+ }
+
+ private void repopulateBroker() throws Exception
+ {
+ // Repopulate this new broker so we can test what happends after failover
+
+ //Get the connection to the first (main port) broker.
+ Connection connection = getConnection();
+ // Use a transaction to send messages so we can be sure they arrive.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, NUM_MESSAGES);
+
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+ connection.close();
+ }
+
+ // AMQConnectionListener Interface.. used so we can validate that we
+ // actually failed over.
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverCompleted.countDown();
+ }
+
+ /**
+ * Override so we can block until failover has completd
+ *
+ * @param port
+ */
+ @Override
+ public void failBroker(int port)
+ {
+ super.failBroker(port);
+
+ try
+ {
+ if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ {
+ // Use an exception so that we use our local fail() that notifies the main thread of failure
+ throw new Exception("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+ }
+
+ }
+ catch (Exception e)
+ {
+ // Use an exception so that we use our local fail() that notifies the main thread of failure
+ fail(e);
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
new file mode 100644
index 0000000000..eb36522fac
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -0,0 +1,306 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements ConnectionListener
+{
+
+ protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ // This must be even for the test to run correctly.
+ // Otherwise we will kill the standby broker
+ // not the one we are connected to.
+ // The test will still pass but it will not be exactly
+ // as described.
+ NUM_MESSAGES = 6;
+ }
+
+ /**
+ * Override default init to add connectionListener so we can verify that
+ * failover took place
+ *
+ * @param transacted create a transacted session for this test
+ * @param mode if not transacted what ack mode to use for this test
+ * @throws Exception if a problem occured during test setup.
+ */
+ @Override
+ protected void init(boolean transacted, int mode) throws Exception
+ {
+ super.init(transacted, mode);
+ ((AMQConnection) _connection).setConnectionListener(this);
+ }
+
+ protected void prepBroker(int count) throws Exception
+ {
+ if (count % 2 == 1)
+ {
+ failBroker(getFailingPort());
+ }
+ else
+ {
+ failBroker(getPort());
+ }
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
+
+ if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
+ {
+ assertEquals("Wrong number of messages on queue", count,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+ }
+
+ connection.close();
+
+ try
+ {
+ if (count % 2 == 1)
+ {
+ startBroker(getFailingPort());
+ }
+ else
+ {
+ startBroker(getPort());
+ }
+ }
+ catch (Exception e)
+ {
+ fail("Unable to start failover broker," + e.getMessage());
+ }
+ }
+
+ @Override
+ public void doAcknowlegement(Message msg) throws JMSException
+ {
+ //Acknowledge current message
+ super.doAcknowlegement(msg);
+
+ try
+ {
+ prepBroker(NUM_MESSAGES - msg.getIntProperty(INDEX) - 1);
+ }
+ catch (Exception e)
+ {
+ fail("Unable to prep new broker," + e.getMessage());
+ }
+
+ }
+
+ /**
+ * Test that Acking/Committing a message received before failover causes
+ * an exception at commit/ack time.
+ *
+ * Expected behaviour is that in:
+ * * tx mode commit() throws a transacted RolledBackException
+ * * client ack mode throws an IllegalStateException
+ *
+ * @param transacted is this session trasacted
+ * @param mode What ack mode should be used if not trasacted
+ *
+ * @throws Exception if something goes wrong.
+ */
+ protected void testDirtyAcking(boolean transacted, int mode) throws Exception
+ {
+ NUM_MESSAGES = 2;
+ //Test Dirty Failover Fails
+ init(transacted, mode);
+
+ _connection.start();
+
+ Message msg = _consumer.receive(1500);
+
+ int count = 0;
+ assertNotNull("Message " + count + " not correctly received.", msg);
+ assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
+
+ //Don't acknowledge just prep the next broker. Without changing count
+ // Prep the new broker to have all all the messages so we can validate
+ // that they can all be correctly received.
+ try
+ {
+
+ //Stop the connection so we can validate the number of message count
+ // on the queue is correct after failover
+ _connection.stop();
+ failBroker(getFailingPort());
+
+ //Get the connection to the first (main port) broker.
+ Connection connection = getConnection();//getConnectionFactory("connection1").getConnectionURL());
+ // Use a transaction to send messages so we can be sure they arrive.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, NUM_MESSAGES);
+
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+ connection.close();
+
+ //restart connection
+ _connection.start();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to prep new broker," + e.getMessage());
+ }
+
+ // Consume the next message - don't check what it is as a normal would
+ // assume it is msg 1 but as we've fallen over it is msg 0 again.
+ msg = _consumer.receive(1500);
+
+ if (_consumerSession.getTransacted())
+ {
+ try
+ {
+ _consumerSession.commit();
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ //expected path
+ }
+ }
+ else
+ {
+ try
+ {
+ msg.acknowledge();
+ fail("Session is dirty we should get an IllegalStateException");
+ }
+ catch (javax.jms.IllegalStateException ise)
+ {
+ assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
+ // Recover the sesion and try again.
+ _consumerSession.recover();
+ }
+ }
+
+ msg = _consumer.receive(1500);
+ // Validate we now get the first message back
+ assertEquals(0, msg.getIntProperty(INDEX));
+
+ msg = _consumer.receive(1500);
+ // and the second message
+ assertEquals(1, msg.getIntProperty(INDEX));
+
+ // And now verify that we can now commit the clean session
+ if (_consumerSession.getTransacted())
+ {
+ _consumerSession.commit();
+ }
+ else
+ {
+ msg.acknowledge();
+ }
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ public void testDirtyClientAck() throws Exception
+ {
+ testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testDirtyAckingTransacted() throws Exception
+ {
+ testDirtyAcking(true, Session.SESSION_TRANSACTED);
+ }
+
+ // AMQConnectionListener Interface.. used so we can validate that we
+ // actually failed over.
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverCompleted.countDown();
+ }
+
+ /**
+ * Override so we can block until failover has completd
+ *
+ * @param port
+ */
+ @Override
+ public void failBroker(int port)
+ {
+ super.failBroker(port);
+
+ try
+ {
+ if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ {
+ fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Failover was interuppted");
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
new file mode 100644
index 0000000000..4254727d36
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.client.failover.FailoverException;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
+{
+ protected CountDownLatch _receviedAll;
+ protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ @Override
+ public void init(boolean transacted, int mode) throws Exception
+ {
+ _receviedAll = new CountDownLatch(NUM_MESSAGES);
+
+ super.init(transacted, mode);
+ _consumer.setMessageListener(this);
+ }
+
+ /**
+ * @param transacted
+ * @param mode
+ *
+ * @throws Exception
+ */
+ protected void testAcking(boolean transacted, int mode) throws Exception
+ {
+ init(transacted, mode);
+
+ _connection.start();
+
+ int lastCount = (int) _receviedAll.getCount();
+
+ boolean complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
+
+ while (!complete)
+ {
+ int currentCount = (int) _receviedAll.getCount();
+
+ // make sure we have received a message in the last cycle.
+ if (lastCount == currentCount)
+ {
+ break;
+ }
+ // Remember the currentCount as the lastCount for the next cycle.
+ // so we can exit if things get locked up.
+ lastCount = currentCount;
+
+ complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
+ }
+
+ if (!complete)
+ {
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+ else
+ {
+ fail("All messages not received missing:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+ }
+ }
+
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+
+ try
+ {
+ _consumer.close();
+ }
+ catch (JMSAMQException amqe)
+ {
+ if (amqe.getLinkedException() instanceof FailoverException)
+ {
+ fail("QPID-143 : Auto Ack can acknowledge message from previous session after failver. If failover occurs between deliver and ack.");
+ }
+ // else Rethrow for TestCase to catch.
+ throw amqe;
+ }
+
+ _consumerSession.close();
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
+ }
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ int count = NUM_MESSAGES - (int) _receviedAll.getCount();
+
+ assertEquals("Incorrect message received", count, message.getIntProperty(INDEX));
+
+ count++;
+ if (count < NUM_MESSAGES)
+ {
+ //Send the next message
+ _producer.send(createNextMessage(_consumerSession, count));
+ }
+
+ doAcknowlegement(message);
+
+ _receviedAll.countDown();
+ }
+ catch (Exception e)
+ {
+ // This will end the test run by counting down _receviedAll
+ fail(e);
+ }
+ }
+
+ /**
+ * Pass the given exception back to the waiting thread to fail the test run.
+ *
+ * @param e The exception that is causing the test to fail.
+ */
+ protected void fail(Exception e)
+ {
+ _causeOfFailure.set(e);
+ // End the test.
+ while (_receviedAll.getCount() != 0)
+ {
+ _receviedAll.countDown();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
index c367a0856c..7c9a77eb53 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
@@ -1,7 +1,5 @@
-package org.apache.qpid.test.unit.ack;
-
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,133 +19,138 @@ package org.apache.qpid.test.unit.ack;
*
*/
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.MessageProducer;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AcknowledgeTest extends QpidTestCase
+public class AcknowledgeTest extends FailoverBaseCase
{
- protected static int NUM_MESSAGES = 100;
- protected Connection _con;
+ protected int NUM_MESSAGES;
+ protected Connection _connection;
protected Queue _queue;
- private MessageProducer _producer;
- private Session _producerSession;
- private Session _consumerSession;
- private MessageConsumer _consumerA;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ protected MessageProducer _producer;
@Override
protected void setUp() throws Exception
{
super.setUp();
- _queue = (Queue) getInitialContext().lookup("queue");
+ NUM_MESSAGES = 5;
+
+ _queue = getTestQueue();
//Create Producer put some messages on the queue
- _con = getConnection();
- _con.start();
+ _connection = getConnection();
}
- private void init(boolean transacted, int mode) throws JMSException {
- _producerSession = _con.createSession(true, Session.AUTO_ACKNOWLEDGE);
- _consumerSession = _con.createSession(transacted, mode);
- _producer = _producerSession.createProducer(_queue);
- _consumerA = _consumerSession.createConsumer(_queue);
- }
+ protected void init(boolean transacted, int mode) throws Exception
+ {
+ _consumerSession = _connection.createSession(transacted, mode);
+ _consumer = _consumerSession.createConsumer(_queue);
+ _producer = _consumerSession.createProducer(_queue);
+
+ // These should all end up being prefetched by session
+ sendMessage(_consumerSession, _queue, 1);
+
+ assertEquals("Wrong number of messages on queue", 1,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
/**
- * Produces and consumes messages an either ack or commit the receipt of those messages
- *
* @param transacted
* @param mode
+ *
* @throws Exception
*/
- private void testMessageAck(boolean transacted, int mode) throws Exception
+ protected void testAcking(boolean transacted, int mode) throws Exception
{
- init(transacted, mode);
- sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
- _producerSession.commit();
- MessageConsumer consumerB = _consumerSession.createConsumer(_queue);
- sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
- _producerSession.commit();
+ init(transacted, mode);
+
+ _connection.start();
+
+ Message msg = _consumer.receive(1500);
+
int count = 0;
- Message msg = consumerB.receive(1500);
- while (msg != null)
+ while (count < NUM_MESSAGES)
{
- if (mode == Session.CLIENT_ACKNOWLEDGE)
+ assertNotNull("Message " + count + " not correctly received.", msg);
+ assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
+ count++;
+
+ if (count < NUM_MESSAGES)
{
- msg.acknowledge();
+ //Send the next message
+ _producer.send(createNextMessage(_consumerSession, count));
}
- count++;
- msg = consumerB.receive(1500);
+
+ doAcknowlegement(msg);
+
+ msg = _consumer.receive(1500);
}
- if (transacted)
- {
- _consumerSession.commit();
- }
- _consumerA.close();
- consumerB.close();
- _consumerSession.close();
- assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count,
- ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
-
- // Clean up messages that may be left on the queue
- _consumerSession = _con.createSession(transacted, mode);
- _consumerA = _consumerSession.createConsumer(_queue);
- msg = _consumerA.receive(1500);
- while (msg != null)
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ /**
+ * Perform the acknowledgement of messages if additionally required.
+ *
+ * @param msg
+ *
+ * @throws JMSException
+ */
+ protected void doAcknowlegement(Message msg) throws JMSException
+ {
+ if (_consumerSession.getTransacted())
{
- if (mode == Session.CLIENT_ACKNOWLEDGE)
- {
- msg.acknowledge();
- }
- msg = _consumerA.receive(1500);
+ _consumerSession.commit();
}
- _consumerA.close();
- if (transacted)
+
+ if (_consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
- _consumerSession.commit();
+ msg.acknowledge();
}
- _consumerSession.close();
}
-
- public void test2ConsumersAutoAck() throws Exception
+
+ public void testClientAck() throws Exception
{
- testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+ testAcking(false, Session.CLIENT_ACKNOWLEDGE);
}
- public void test2ConsumersClientAck() throws Exception
+ public void testAutoAck() throws Exception
{
- testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+ testAcking(false, Session.AUTO_ACKNOWLEDGE);
}
-
- public void test2ConsumersTx() throws Exception
+
+ public void testTransacted() throws Exception
{
- testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+ testAcking(true, Session.SESSION_TRANSACTED);
}
-
- public void testIndividualAck() throws Exception
+
+ public void testDupsOk() throws Exception
{
- init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessage(_producerSession, _queue, 3);
- _producerSession.commit();
- Message msg = null;
- for (int i = 0; i < 2; i++)
- {
- msg = _consumerA.receive(RECEIVE_TIMEOUT);
- ((AbstractJMSMessage)msg).acknowledgeThis();
- }
- msg = _consumerA.receive(RECEIVE_TIMEOUT);
- msg.acknowledge();
- _con.close();
+ testAcking(false, Session.DUPS_OK_ACKNOWLEDGE);
}
-
+
+ public void testNoAck() throws Exception
+ {
+ testAcking(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testPreAck() throws Exception
+ {
+ testAcking(false, AMQSession.PRE_ACKNOWLEDGE);
+ }
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
new file mode 100644
index 0000000000..834b17430b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.jms.Session;
+
+import javax.jms.Message;
+import javax.jms.Queue;
+
+public class FailoverBeforeConsumingRecoverTest extends RecoverTest
+{
+
+ @Override
+ protected void initTest() throws Exception
+ {
+ super.initTest();
+ failBroker(getFailingPort());
+
+ Queue queue = _consumerSession.createQueue(getTestQueueName());
+ sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
new file mode 100644
index 0000000000..6c4b7ba01b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * This is a quick manual test to validate acking after failover with a
+ * transacted session.
+ *
+ * Start an external broker then run this test. Std Err will print.
+ * Sent Message: 1
+ * Received Message: 1
+ *
+ * You can then restart the external broker, which will cause failover, which
+ * will be complete when the following appears.
+ *
+ * Failover Complete
+ *
+ * A second message send/receive cycle is then done to validate that the
+ * connection/session are still working.
+ *
+ */
+public class QuickAcking extends QpidTestCase implements ConnectionListener
+{
+ protected AMQConnection _connection;
+ protected Queue _queue;
+ protected Session _session;
+ protected MessageConsumer _consumer;
+ private CountDownLatch _failedOver;
+ private static final String INDEX = "INDEX";
+ private int _count = 0;
+
+ public void setUp()
+ {
+ // Prevent broker startup. Broker must be run manually.
+ }
+
+ public void test() throws Exception
+ {
+ _failedOver = new CountDownLatch(1);
+
+ _connection = new AMQConnection("amqp://guest:guest@client/test?brokerlist='localhost?retries='20'&connectdelay='2000''");
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue("QAtest");
+ _consumer = _session.createConsumer(_queue);
+ _connection.setConnectionListener(this);
+ _connection.start();
+
+ sendAndReceive();
+
+ _failedOver.await();
+
+ sendAndReceive();
+
+ }
+
+ private void sendAndReceive()
+ throws Exception
+ {
+ sendMessage();
+
+ Message message = _consumer.receive();
+
+ if (message.getIntProperty(INDEX) != _count)
+ {
+ throw new Exception("Incorrect message recieved:" + _count);
+ }
+
+ if (_session.getTransacted())
+ {
+ _session.commit();
+ }
+ System.err.println("Recevied Message:" + _count);
+ }
+
+ private void sendMessage() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+ Message message = _session.createMessage();
+ _count++;
+ message.setIntProperty(INDEX, _count);
+
+ producer.send(message);
+ if (_session.getTransacted())
+ {
+ _session.commit();
+ }
+ producer.close();
+
+ System.err.println("Sent Message:" + _count);
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ System.err.println("Failover Complete");
+ _failedOver.countDown();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 7434fcbb30..4a123cb1dc 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -23,8 +23,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.test.utils.QpidTestCase;
-
+import org.apache.qpid.test.utils.FailoverBaseCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,16 +34,21 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
-
import java.util.concurrent.atomic.AtomicInteger;
-public class RecoverTest extends QpidTestCase
+public class RecoverTest extends FailoverBaseCase
{
- private static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
+ static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
private Exception _error;
private AtomicInteger count;
+ protected AMQConnection _connection;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ static final int SENT_COUNT = 4;
+
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -52,134 +56,110 @@ public class RecoverTest extends QpidTestCase
count = new AtomicInteger();
}
- protected void tearDown() throws Exception
+ protected void initTest() throws Exception
{
- super.tearDown();
- count = null;
- }
+ _connection = (AMQConnection) getConnection("guest", "guest");
- public void testRecoverResendsMsgs() throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = _consumerSession.createQueue(getTestQueueName());
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer = _consumerSession.createConsumer(queue);
_logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
+ sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
_logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. Calling recover with three outstanding messages");
- // no ack for last three messages so when I call recover I expect to get three messages back
- consumerSession.recover();
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg2", tm.getText());
+ _connection.start();
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm.getText());
+ protected Message validateNextMessages(int nextCount, int startIndex) throws JMSException
+ {
+ Message message = null;
+ for (int index = 0; index < nextCount; index++)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(startIndex + index, message.getIntProperty(INDEX));
+ }
+ return message;
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm.getText());
+ protected void validateRemainingMessages(int remaining) throws JMSException
+ {
+ int index = SENT_COUNT - remaining;
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
+ Message message = null;
+ while (index != SENT_COUNT)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(index++, message.getIntProperty(INDEX));
+ }
+
+ if (message != null)
+ {
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ message.acknowledge();
+ }
_logger.info("Calling acknowledge with no outstanding messages");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
+ message = _consumer.receiveNoWait();
+ assertNull(message);
_logger.info("No messages redelivered as is expected");
-
- con.close();
}
- public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ public void testRecoverResendsMsgs() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ initTest();
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ Message message = validateNextMessages(1, 0);
+ message.acknowledge();
+ _logger.info("Received and acknowledged first message");
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
+ _logger.info("Received all four messages. Calling recover with three outstanding messages");
+ // no ack for last three messages so when I call recover I expect to get three messages back
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
+ _consumerSession.recover();
- con2.close();
+ validateRemainingMessages(3);
+ }
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- consumer.receive();
- tm.acknowledge();
+ public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ {
+ initTest();
+
+ Message message = validateNextMessages(2, 0);
+ message.acknowledge();
_logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
- consumer.receive();
- consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
_logger.info("Received all four messages. Calling recover with two outstanding messages");
// no ack for last three messages so when I call recover I expect to get three messages back
- consumerSession.recover();
- TextMessage tm3 = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm3.getText());
+ _consumerSession.recover();
+
+ Message message2 = _consumer.receive(3000);
+ assertEquals(2, message2.getIntProperty(INDEX));
- TextMessage tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
+ Message message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
_logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) message2).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
- ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
+ message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
+ ((org.apache.qpid.jms.Message) message3).acknowledgeThis();
- _logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
-
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
-
- con.close();
+ validateRemainingMessages(0);
}
public void testAcknowledgePerConsumer() throws Exception
@@ -188,11 +168,11 @@ public class RecoverTest extends QpidTestCase
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"),
+ false, true);
Queue queue2 =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
@@ -231,8 +211,8 @@ public class RecoverTest extends QpidTestCase
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
@@ -240,50 +220,50 @@ public class RecoverTest extends QpidTestCase
final Object lock = new Object();
consumer.setMessageListener(new MessageListener()
- {
+ {
- public void onMessage(Message message)
+ public void onMessage(Message message)
+ {
+ try
{
- try
+ count.incrementAndGet();
+ if (count.get() == 1)
{
- count.incrementAndGet();
- if (count.get() == 1)
+ if (message.getJMSRedelivered())
{
- if (message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception("Message marked as redilvered on what should be first delivery attempt"));
- }
-
- consumerSession.recover();
}
- else if (count.get() == 2)
+
+ consumerSession.recover();
+ }
+ else if (count.get() == 2)
+ {
+ if (!message.getJMSRedelivered())
{
- if (!message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception(
- "Message not marked as redilvered on what should be second delivery attempt"));
- }
- }
- else
- {
- System.err.println(message);
- fail("Message delivered too many times!: " + count);
+ "Message not marked as redilvered on what should be second delivery attempt"));
}
}
- catch (JMSException e)
+ else
{
- _logger.error("Error recovering session: " + e, e);
- setError(e);
+ System.err.println(message);
+ fail("Message delivered too many times!: " + count);
}
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error recovering session: " + e, e);
+ setError(e);
+ }
- synchronized (lock)
- {
- lock.notify();
- }
+ synchronized (lock)
+ {
+ lock.notify();
}
- });
+ }
+ });
con.start();
@@ -323,9 +303,4 @@ public class RecoverTest extends QpidTestCase
{
_error = e;
}
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(RecoverTest.class);
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SelectorTest.java
deleted file mode 100644
index c42e4c7582..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SelectorTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.basic;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BasicMessageProducer;
-import org.apache.qpid.client.state.StateWaiter;
-import org.apache.qpid.url.URLSyntaxException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.concurrent.CountDownLatch;
-
-public class SelectorTest extends QpidTestCase implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(SelectorTest.class);
-
- private AMQConnection _connection;
- private AMQDestination _destination;
- private AMQSession _session;
- private int count;
- public String _connectionString = "vm://:1";
- private static final String INVALID_SELECTOR = "Cost LIKE 5";
- CountDownLatch _responseLatch = new CountDownLatch(1);
-
- protected void setUp() throws Exception
- {
- super.setUp();
- init((AMQConnection) getConnection("guest", "guest"));
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- private void init(AMQConnection connection) throws JMSException
- {
- init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true));
- }
-
- private void init(AMQConnection connection, AMQDestination destination) throws JMSException
- {
- _connection = connection;
- _destination = destination;
- connection.start();
-
- String selector = null;
- selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
- // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
-
- _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- // _session.createConsumer(destination).setMessageListener(this);
- _session.createConsumer(destination, selector).setMessageListener(this);
- }
-
- public void test() throws Exception
- {
- try
- {
-
- init((AMQConnection) getConnection("guest", "guest", randomize("Client")));
-
- Message msg = _session.createTextMessage("Message");
- msg.setJMSPriority(1);
- msg.setIntProperty("Cost", 2);
- msg.setStringProperty("property-with-hyphen", "wibble");
- msg.setJMSType("Special");
-
- _logger.info("Sending Message:" + msg);
-
- ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
- _logger.info("Message sent, waiting for response...");
-
- _responseLatch.await();
-
- if (count > 0)
- {
- _logger.info("Got message");
- }
-
- if (count == 0)
- {
- fail("Did not get message!");
- // throw new RuntimeException("Did not get message!");
- }
- }
- catch (JMSException e)
- {
- _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
- if (!(e instanceof InvalidSelectorException))
- {
- fail("Wrong exception:" + e.getMessage());
- }
- else
- {
- System.out.println("SUCCESS!!");
- }
- }
- catch (InterruptedException e)
- {
- _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
- }
- catch (URLSyntaxException e)
- {
- _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage());
- fail("Wrong exception");
- }
- catch (AMQException e)
- {
- _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage());
- fail("Wrong exception");
- }
-
- finally
- {
- if (_session != null)
- {
- _session.close();
- }
- if (_connection != null)
- {
- _connection.close();
- }
- }
- }
-
-
- public void testInvalidSelectors() throws Exception
- {
- Connection connection = null;
-
- try
- {
- connection = getConnection("guest", "guest", randomize("Client"));
- _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- }
- catch (JMSException e)
- {
- fail(e.getMessage());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- catch (URLSyntaxException e)
- {
- fail("Error:" + e.getMessage());
- }
-
- //Try Creating a Browser
- try
- {
- _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR);
- }
- catch (JMSException e)
- {
- _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
- if (!(e instanceof InvalidSelectorException))
- {
- fail("Wrong exception:" + e.getMessage());
- }
- else
- {
- _logger.debug("SUCCESS!!");
- }
- }
-
- //Try Creating a Consumer
- try
- {
- _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR);
- }
- catch (JMSException e)
- {
- _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
- if (!(e instanceof InvalidSelectorException))
- {
- fail("Wrong exception:" + e.getMessage());
- }
- else
- {
- _logger.debug("SUCCESS!!");
- }
- }
-
- //Try Creating a Receiever
- try
- {
- _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR);
- }
- catch (JMSException e)
- {
- _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
- if (!(e instanceof InvalidSelectorException))
- {
- fail("Wrong exception:" + e.getMessage());
- }
- else
- {
- _logger.debug("SUCCESS!!");
- }
- }
-
- finally
- {
- if (_session != null)
- {
- try
- {
- _session.close();
- }
- catch (JMSException e)
- {
- fail("Error cleaning up:" + e.getMessage());
- }
- }
- if (_connection != null)
- {
- try
- {
- _connection.close();
- }
- catch (JMSException e)
- {
- fail("Error cleaning up:" + e.getMessage());
- }
- }
- }
- }
-
- public void onMessage(Message message)
- {
- count++;
- _logger.info("Got Message:" + message);
- _responseLatch.countDown();
- }
-
- private static String randomize(String in)
- {
- return in + System.currentTimeMillis();
- }
-
- public static void main(String[] argv) throws Exception
- {
- SelectorTest test = new SelectorTest();
- test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
-
- try
- {
- while (true)
- {
- if (test._connectionString.contains("vm://:1"))
- {
- test.setUp();
- }
- test.test();
-
- if (test._connectionString.contains("vm://:1"))
- {
- test.tearDown();
- }
- }
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- e.printStackTrace();
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(SelectorTest.class);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index 1f90f1e29f..6b69ccab6c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Destination;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
@@ -105,7 +106,8 @@ public class JMSPropertiesTest extends QpidTestCase
assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType());
assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo());
assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:"));
-
+ assertEquals("JMS Default priority should be 4",Message.DEFAULT_PRIORITY,rm.getJMSPriority());
+
//Validate that the JMSX values are correct
assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID"));
assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq"));
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java
new file mode 100644
index 0000000000..248042d2c4
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java
@@ -0,0 +1,403 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.publish;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * QPID-1816 : Whilst testing Acknoledgement after failover this completes testing
+ * of the client after failover. When we have a dirty session we should receive
+ * an error if we attempt to publish. This test ensures that both in the synchronous
+ * and asynchronous message delivery paths we receive the expected exceptions at
+ * the expected time.
+ */
+public class DirtyTrasactedPubilshTest extends FailoverBaseCase implements ConnectionListener
+{
+ protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+
+ protected int NUM_MESSAGES;
+ protected Connection _connection;
+ protected Queue _queue;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ protected MessageProducer _producer;
+
+ private static final String MSG = "MSG";
+ private static final String SEND_FROM_ON_MESSAGE_TEXT = "sendFromOnMessage";
+ protected CountDownLatch _receviedAll;
+ protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ NUM_MESSAGES = 10;
+
+ _queue = getTestQueue();
+
+ //Create Producer put some messages on the queue
+ _connection = getConnection();
+ }
+
+ /**
+ * Initialise the test variables
+ * @param transacted is this a transacted test
+ * @param mode if not trasacted then what ack mode to use
+ * @throws Exception if there is a setup issue.
+ */
+ protected void init(boolean transacted, int mode) throws Exception
+ {
+ _consumerSession = _connection.createSession(transacted, mode);
+ _consumer = _consumerSession.createConsumer(_queue);
+ _producer = _consumerSession.createProducer(_queue);
+
+ // These should all end up being prefetched by session
+ sendMessage(_consumerSession, _queue, 1);
+
+ assertEquals("Wrong number of messages on queue", 1,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ /**
+ * If a transacted session has failed over whilst it has uncommitted sent
+ * data then we need to throw a TransactedRolledbackException on commit()
+ *
+ * The alternative would be to maintain a replay buffer so that the message
+ * could be resent. This is not currently implemented
+ *
+ * @throws Exception if something goes wrong.
+ */
+ public void testDirtySendingSynchronousTransacted() throws Exception
+ {
+ Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ // Ensure we get failover notifications
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ // Create and send message 0
+ Message msg = producerSession.createMessage();
+ msg.setIntProperty(INDEX, 0);
+ producer.send(msg);
+
+ // DON'T commit message .. fail connection
+
+ failBroker(getFailingPort());
+
+ // Ensure destination exists for sending
+ producerSession.createConsumer(_queue).close();
+
+ // Send the next message
+ msg.setIntProperty(INDEX, 1);
+ try
+ {
+ producer.send(msg);
+ fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
+ }
+ catch (JMSException jmse)
+ {
+ assertEquals("Early warning of dirty session not correct",
+ "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
+ }
+
+ // Ignore that the session is dirty and attempt to commit to validate the
+ // exception is thrown. AND that the above failure notification did NOT
+ // clean up the session.
+
+ try
+ {
+ producerSession.commit();
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ // Normal path.
+ }
+
+ // Resending of messages should now work ok as the commit was forcilbly rolledback
+ msg.setIntProperty(INDEX, 0);
+ producer.send(msg);
+ msg.setIntProperty(INDEX, 1);
+ producer.send(msg);
+
+ producerSession.commit();
+
+ assertEquals("Wrong number of messages on queue", 2,
+ ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ /**
+ * If a transacted session has failed over whilst it has uncommitted sent
+ * data then we need to throw a TransactedRolledbackException on commit()
+ *
+ * The alternative would be to maintain a replay buffer so that the message
+ * could be resent. This is not currently implemented
+ *
+ * @throws Exception if something goes wrong.
+ */
+ public void testDirtySendingOnMessageTransacted() throws Exception
+ {
+ NUM_MESSAGES = 1;
+ _receviedAll = new CountDownLatch(NUM_MESSAGES);
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+ init(true, Session.SESSION_TRANSACTED);
+
+ _consumer.setMessageListener(new MessageListener()
+ {
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ // Create and send message 0
+ Message msg = _consumerSession.createMessage();
+ msg.setIntProperty(INDEX, 0);
+ _producer.send(msg);
+
+ // DON'T commit message .. fail connection
+
+ failBroker(getFailingPort());
+
+ // rep
+ repopulateBroker();
+
+ // Destination will exist as this failBroker will populate
+ // the queue with 1 message
+
+ // Send the next message
+ msg.setIntProperty(INDEX, 1);
+ try
+ {
+ _producer.send(msg);
+ fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
+ }
+ catch (JMSException jmse)
+ {
+ assertEquals("Early warning of dirty session not correct",
+ "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
+ }
+
+ // Ignore that the session is dirty and attempt to commit to validate the
+ // exception is thrown. AND that the above failure notification did NOT
+ // clean up the session.
+
+ try
+ {
+ _consumerSession.commit();
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ // Normal path.
+ }
+
+ // Resend messages
+ msg.setIntProperty(INDEX, 0);
+ msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT);
+ _producer.send(msg);
+ msg.setIntProperty(INDEX, 1);
+ msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT);
+ _producer.send(msg);
+
+ _consumerSession.commit();
+
+ // Stop this consumer .. can't do _consumer.stop == DEADLOCK
+ // this doesn't seem to stop dispatcher running
+ _connection.stop();
+
+ // Signal that the onMessage send part of test is complete
+ // main thread can validate that messages are correct
+ _receviedAll.countDown();
+
+ }
+ catch (Exception e)
+ {
+ fail(e);
+ }
+
+ }
+
+ });
+
+ _connection.start();
+
+ if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS))
+ {
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+ else
+ {
+ fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+ }
+ }
+
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+
+ _consumer.close();
+ _consumerSession.close();
+
+ _consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _connection.start();
+
+ // Validate that we could send the messages as expected.
+ assertEquals("Wrong number of messages on queue", 3,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+
+ MessageConsumer consumer = _consumerSession.createConsumer(_queue);
+
+ //Validate the message sent to setup the failed over broker.
+ Message message = consumer.receive(1000);
+ assertNotNull("Message " + 0 + " not received.", message);
+ assertEquals("Incorrect message received", 0, message.getIntProperty(INDEX));
+
+ // Validate the two messages sent from within the onMessage
+ for (int index = 0; index <= 1; index++)
+ {
+ message = consumer.receive(1000);
+ assertNotNull("Message " + index + " not received.", message);
+ assertEquals("Incorrect message received", index, message.getIntProperty(INDEX));
+ assertEquals("Incorrect message text for message:" + index, SEND_FROM_ON_MESSAGE_TEXT, message.getStringProperty(MSG));
+ }
+
+ assertNull("Extra message received.", consumer.receiveNoWait());
+
+ _consumerSession.close();
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
+ }
+
+ private void repopulateBroker() throws Exception
+ {
+ // Repopulate this new broker so we can test what happends after failover
+
+ //Get the connection to the first (main port) broker.
+ Connection connection = getConnection();
+ // Use a transaction to send messages so we can be sure they arrive.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, NUM_MESSAGES);
+
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+ connection.close();
+ }
+
+ // AMQConnectionListener Interface.. used so we can validate that we
+ // actually failed over.
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverCompleted.countDown();
+ }
+
+ /**
+ * Override so we can block until failover has completd
+ *
+ * @param port int the port of the broker to fail.
+ */
+ @Override
+ public void failBroker(int port)
+ {
+ super.failBroker(port);
+
+ try
+ {
+ if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ {
+ fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Failover was interuppted");
+ }
+ }
+
+ /**
+ * Pass the given exception back to the waiting thread to fail the test run.
+ *
+ * @param e The exception that is causing the test to fail.
+ */
+ protected void fail(Exception e)
+ {
+ _causeOfFailure.set(e);
+ // End the test.
+ while (_receviedAll.getCount() != 0)
+ {
+ _receviedAll.countDown();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 64bd1503ba..0426c4f45f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -20,44 +20,43 @@
*/
package org.apache.qpid.test.utils;
-import javax.jms.Connection;
-import javax.jms.JMSException;
+import org.apache.qpid.util.FileUtils;
+
import javax.naming.NamingException;
-import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FailoverBaseCase extends QpidTestCase
{
+ protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class);
public static int FAILING_VM_PORT = 2;
- public static int FAILING_PORT = DEFAULT_PORT + 100;
+ public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
+ public static final long DEFAULT_FAILOVER_TIME = 10000L;
protected int failingPort;
-
- private boolean failedOver = false;
- public FailoverBaseCase()
+ protected int getFailingPort()
{
if (_broker.equals(VM))
{
- failingPort = FAILING_VM_PORT;
+ return FAILING_VM_PORT;
}
else
{
- failingPort = FAILING_PORT;
+ return FAILING_PORT;
}
}
-
- protected int getFailingPort()
- {
- return failingPort;
- }
protected void setUp() throws java.lang.Exception
{
super.setUp();
- setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
- startBroker(failingPort);
+ // Set QPID_WORK to $QPID_WORK/<getFailingPort()>
+ // or /tmp/<getFailingPort()> if QPID_WORK not set.
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort());
+ startBroker(getFailingPort());
}
/**
@@ -66,42 +65,52 @@ public class FailoverBaseCase extends QpidTestCase
* @return a connection
* @throws Exception
*/
- public Connection getConnection() throws JMSException, NamingException
+ @Override
+ public AMQConnectionFactory getConnectionFactory() throws NamingException
{
- Connection conn =
- (Boolean.getBoolean("profile.use_ssl"))?
- getConnectionFactory("failover.ssl").createConnection("guest", "guest"):
- getConnectionFactory("failover").createConnection("guest", "guest");
- _connections.add(conn);
- return conn;
+ _logger.info("get ConnectionFactory");
+ if (_connectionFactory == null)
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ _connectionFactory = getConnectionFactory("failover.ssl");
+ }
+ else
+ {
+ _connectionFactory = getConnectionFactory("failover");
+ }
+ }
+ return _connectionFactory;
}
+
public void tearDown() throws Exception
{
- stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT);
- super.tearDown();
- FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ // Ensure we shutdown any secondary brokers, even if we are unable
+ // to cleanly tearDown the QTC.
+ stopBroker(getFailingPort());
+ FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
+ }
}
- /**
- * Only used of VM borker.
- */
- public void failBroker()
+ public void failBroker(int port)
{
- failedOver = true;
try
{
- stopBroker(getFailingPort());
+ stopBroker(port);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
-
- protected void setFailingPort(int p)
- {
- failingPort = p;
- }
+
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 76e4118c96..6c1b1c7b8d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -22,8 +22,10 @@ import junit.framework.TestResult;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.store.DerbyMessageStore;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,13 +73,18 @@ public class QpidTestCase extends TestCase
protected final String QpidHome = System.getProperty("QPID_HOME");
protected File _configFile = new File(System.getProperty("broker.config"));
- private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
+ protected static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
protected long RECEIVE_TIMEOUT = 1000l;
- private Map<String, String> _setProperties = new HashMap<String, String>();
+ private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>();
+ private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>();
+ private Map<org.apache.log4j.Logger, Level> _loggerLevelSetForTest = new HashMap<org.apache.log4j.Logger, Level>();
+
private XMLConfiguration _testConfiguration = new XMLConfiguration();
+ protected static final String INDEX = "index";
+
/**
* Some tests are excluded when the property test.excludes is set to true.
* An exclusion list is either a file (prop test.excludesfile) which contains one test name
@@ -147,6 +155,7 @@ public class QpidTestCase extends TestCase
private static final String BROKER_LANGUAGE = "broker.language";
private static final String BROKER = "broker";
private static final String BROKER_CLEAN = "broker.clean";
+ private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests";
private static final String BROKER_VERSION = "broker.version";
protected static final String BROKER_READY = "broker.ready";
private static final String BROKER_STOPPED = "broker.stopped";
@@ -169,6 +178,7 @@ public class QpidTestCase extends TestCase
protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA);
protected String _broker = System.getProperty(BROKER, VM);
private String _brokerClean = System.getProperty(BROKER_CLEAN, null);
+ private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
private String _output = System.getProperty(TEST_OUTPUT);
@@ -177,7 +187,7 @@ public class QpidTestCase extends TestCase
private Map<Integer, Process> _brokers = new HashMap<Integer, Process>();
private InitialContext _initialContext;
- private AMQConnectionFactory _connectionFactory;
+ protected AMQConnectionFactory _connectionFactory;
private String _testName;
@@ -235,6 +245,19 @@ public class QpidTestCase extends TestCase
{
_logger.error("exception stopping broker", e);
}
+
+ if(_brokerCleanBetweenTests)
+ {
+ try
+ {
+ cleanBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("exception cleaning up broker", e);
+ }
+ }
+
_logger.info("========== stop " + _testName + " ==========");
if (redirected)
@@ -451,6 +474,15 @@ public class QpidTestCase extends TestCase
env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\"");
env.put("QPID_WORK", System.getProperty("QPID_WORK"));
+
+ // Use the environment variable to set amqj.logging.level for the broker
+ // The value used is a 'server' value in the test configuration to
+ // allow a differentiation between the client and broker logging levels.
+ if (System.getProperty("amqj.server.logging.level") != null)
+ {
+ setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level"));
+ }
+
// Add all the environment settings the test requested
if (!_env.isEmpty())
{
@@ -460,13 +492,27 @@ public class QpidTestCase extends TestCase
}
}
+
+ // Add default test logging levels that are used by the log4j-test
+ // Use the convenience methods to push the current logging setting
+ // in to the external broker's QPID_OPTS string.
+ if (System.getProperty("amqj.protocol.logging.level") != null)
+ {
+ setSystemProperty("amqj.protocol.logging.level");
+ }
+ if (System.getProperty("root.logging.level") != null)
+ {
+ setSystemProperty("root.logging.level");
+ }
+
+
String QPID_OPTS = " ";
// Add all the specified system properties to QPID_OPTS
- if (!_setProperties.isEmpty())
+ if (!_propertiesSetForBroker.isEmpty())
{
- for (String key : _setProperties.keySet())
+ for (String key : _propertiesSetForBroker.keySet())
{
- QPID_OPTS += "-D" + key + "=" + System.getProperty(key) + " ";
+ QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " ";
}
if (env.containsKey("QPID_OPTS"))
@@ -489,7 +535,7 @@ public class QpidTestCase extends TestCase
if (!p.await(30, TimeUnit.SECONDS))
{
- _logger.info("broker failed to become ready:" + p.getStopLine());
+ _logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine());
//Ensure broker has stopped
process.destroy();
cleanBroker();
@@ -667,28 +713,87 @@ public class QpidTestCase extends TestCase
}
/**
+ * Set a System property that is to be applied only to the external test
+ * broker.
+ *
+ * This is a convenience method to enable the setting of a -Dproperty=value
+ * entry in QPID_OPTS
+ *
+ * This is only useful for the External Java Broker tests.
+ *
+ * @param property the property name
+ * @param value the value to set the property to
+ */
+ protected void setBrokerOnlySystemProperty(String property, String value)
+ {
+ if (!_propertiesSetForBroker.containsKey(property))
+ {
+ _propertiesSetForBroker.put(property, value);
+ }
+
+ }
+
+ /**
+ * Set a System (-D) property for this test run.
+ *
+ * This convenience method copies the current VMs System Property
+ * for the external VM Broker.
+ *
+ * @param property the System property to set
+ */
+ protected void setSystemProperty(String property)
+ {
+ setSystemProperty(property, System.getProperty(property));
+ }
+
+ /**
* Set a System property for the duration of this test.
*
* When the test run is complete the value will be reverted.
*
+ * The values set using this method will also be propogated to the external
+ * Java Broker via a -D value defined in QPID_OPTS.
+ *
+ * If the value should not be set on the broker then use
+ * setTestClientSystemProperty().
+ *
* @param property the property to set
* @param value the new value to use
*/
protected void setSystemProperty(String property, String value)
{
- if (!_setProperties.containsKey(property))
+ // Record the value for the external broker
+ _propertiesSetForBroker.put(property, value);
+
+ //Set the value for the test client vm aswell.
+ setTestClientSystemProperty(property, value);
+ }
+
+ /**
+ * Set a System (-D) property for the external Broker of this test.
+ *
+ * @param property The property to set
+ * @param value the value to set it to.
+ */
+ protected void setTestClientSystemProperty(String property, String value)
+ {
+ if (!_propertiesSetForTestOnly.containsKey(property))
{
- _setProperties.put(property, System.getProperty(property));
- }
+ // Record the current value so we can revert it later.
+ _propertiesSetForTestOnly.put(property, System.getProperty(property));
+ }
System.setProperty(property, value);
}
+ /**
+ * Restore the System property values that were set before this test run.
+ */
protected void revertSystemProperties()
{
- for (String key : _setProperties.keySet())
+ for (String key : _propertiesSetForTestOnly.keySet())
{
- String value = _setProperties.get(key);
+ String value = _propertiesSetForTestOnly.get(key);
if (value != null)
{
System.setProperty(key, value);
@@ -698,6 +803,12 @@ public class QpidTestCase extends TestCase
System.clearProperty(key);
}
}
+
+ _propertiesSetForTestOnly.clear();
+
+ // We don't change the current VMs settings for Broker only properties
+ // so we can just clear this map
+ _propertiesSetForBroker.clear();
}
/**
@@ -712,6 +823,40 @@ public class QpidTestCase extends TestCase
}
/**
+ * Adjust the VMs Log4j Settings just for this test run
+ *
+ * @param logger the logger to change
+ * @param level the level to set
+ */
+ protected void setLoggerLevel(org.apache.log4j.Logger logger, Level level)
+ {
+ assertNotNull("Cannot set level of null logger", logger);
+ assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level);
+
+ if (!_loggerLevelSetForTest.containsKey(logger))
+ {
+ // Record the current value so we can revert it later.
+ _loggerLevelSetForTest.put(logger, logger.getLevel());
+ }
+
+ logger.setLevel(level);
+ }
+
+ /**
+ * Restore the logging levels defined by this test.
+ */
+ protected void revertLoggingLevels()
+ {
+ for (org.apache.log4j.Logger logger : _loggerLevelSetForTest.keySet())
+ {
+ logger.setLevel(_loggerLevelSetForTest.get(logger));
+ }
+
+ _loggerLevelSetForTest.clear();
+
+ }
+
+ /**
* Check whether the broker is an 0.8
*
* @return true if the broker is an 0_8 version, false otherwise.
@@ -786,7 +931,7 @@ public class QpidTestCase extends TestCase
{
if (Boolean.getBoolean("profile.use_ssl"))
{
- _connectionFactory = getConnectionFactory("ssl");
+ _connectionFactory = getConnectionFactory("default.ssl");
}
else
{
@@ -876,15 +1021,32 @@ public class QpidTestCase extends TestCase
return getClass().getSimpleName() + "-" + getName();
}
+ /**
+ * Return a Queue specific for this test.
+ * Uses getTestQueueName() as the name of the queue
+ * @return
+ */
+ public Queue getTestQueue()
+ {
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName());
+ }
+
+
protected void tearDown() throws java.lang.Exception
{
- // close all the connections used by this test.
- for (Connection c : _connections)
+ try
{
- c.close();
+ // close all the connections used by this test.
+ for (Connection c : _connections)
+ {
+ c.close();
+ }
+ }
+ finally{
+ // Ensure any problems with close does not interfer with property resets
+ revertSystemProperties();
+ revertLoggingLevels();
}
-
- revertSystemProperties();
}
/**
@@ -921,17 +1083,23 @@ public class QpidTestCase extends TestCase
public List<Message> sendMessage(Session session, Destination destination,
int count) throws Exception
{
- return sendMessage(session, destination, count, 0);
+ return sendMessage(session, destination, count, 0, 0);
}
public List<Message> sendMessage(Session session, Destination destination,
int count, int batchSize) throws Exception
{
+ return sendMessage(session, destination, count, 0, batchSize);
+ }
+
+ public List<Message> sendMessage(Session session, Destination destination,
+ int count, int offset, int batchSize) throws Exception
+ {
List<Message> messages = new ArrayList<Message>(count);
MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < count; i++)
+ for (int i = offset; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
@@ -950,8 +1118,11 @@ public class QpidTestCase extends TestCase
}
// Ensure we commit the last messages
- if (session.getTransacted() && (batchSize > 0) &&
- (count / batchSize != 0))
+ // Commit the session if we are transacted and
+ // we have no batchSize or
+ // our count is not divible by batchSize.
+ if (session.getTransacted() &&
+ ( batchSize == 0 || count % batchSize != 0))
{
session.commit();
}
@@ -961,7 +1132,11 @@ public class QpidTestCase extends TestCase
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
- return session.createMessage();
+ Message message = session.createMessage();
+ message.setIntProperty(INDEX, msgCount);
+
+ return message;
+
}
public ConnectionURL getConnectionURL() throws NamingException
diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes
index f03d62667d..757a1e425c 100644..100755
--- a/qpid/java/test-profiles/010Excludes
+++ b/qpid/java/test-profiles/010Excludes
@@ -92,3 +92,12 @@ org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
// QPID-2084 : this test needs more work for 0-10
org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#*
+
+// QPID-2118 : 0-10 Java client has differrent error handling to 0-8 code path
+org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
+
+//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker)
+org.apache.qpid.server.queue.ProducerFlowControlTest#*
+
+//QPID-1950 : Commit to test this failure. This is a MINA only failure so it cannot be tested when using 010.
+org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#*
diff --git a/qpid/java/test-profiles/08Excludes b/qpid/java/test-profiles/08Excludes
index b277c6d929..6f3898384d 100644
--- a/qpid/java/test-profiles/08Excludes
+++ b/qpid/java/test-profiles/08Excludes
@@ -14,8 +14,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-org.apache.qpid.test.client.RollbackOrderTest#*
-
// QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes
index de876e06bb..ee781fb80f 100644
--- a/qpid/java/test-profiles/08StandaloneExcludes
+++ b/qpid/java/test-profiles/08StandaloneExcludes
@@ -39,8 +39,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-org.apache.qpid.test.client.RollbackOrderTest#*
-
// This test requires the standard configuration file for validation.
// Excluding here does not reduce test coverage.
org.apache.qpid.server.configuration.ServerConfigurationFileTest#*
diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes
index a72d3bc86c..aa60554c04 100644
--- a/qpid/java/test-profiles/Excludes
+++ b/qpid/java/test-profiles/Excludes
@@ -19,3 +19,15 @@ org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#*
// QPID-2081 :The configuration changes are now highlighting the close race condition
org.apache.qpid.server.security.acl.SimpleACLTest#*
+
+// QPID-1816 : Client Ack has not been addressed
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDirtyClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testDirtyClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck
+
+
+// QPID-143 : Failover can occur between receive and ack but we don't stop the ack.
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk
+
diff --git a/qpid/java/test-profiles/default.testprofile b/qpid/java/test-profiles/default.testprofile
index 86a5b2efb3..28127e5c4c 100644
--- a/qpid/java/test-profiles/default.testprofile
+++ b/qpid/java/test-profiles/default.testprofile
@@ -11,11 +11,14 @@ max_prefetch=1000
log=debug
amqj.logging.level=${log}
+amqj.server.logging.level=${log}
amqj.protocol.logging.level=${log}
root.logging.level=warn
log4j.configuration=file:///${test.profiles}/log4j-test.xml
log4j.debug=false
+# Note test-provider.properties also has variables of same name.
+# Keep in sync
test.port=15672
test.mport=18999
#Note : Management will start open second port on: mport + 100 : 19099
diff --git a/qpid/java/test-profiles/log4j-test.xml b/qpid/java/test-profiles/log4j-test.xml
index 0aaa7d8686..2d77942a81 100644
--- a/qpid/java/test-profiles/log4j-test.xml
+++ b/qpid/java/test-profiles/log4j-test.xml
@@ -29,10 +29,10 @@
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
+ <param name="ImmediateFlush" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%t %d %p [%c{4}] %m%n"/>
</layout>
- <param name="ImmediateFlush" value="true"/>
</appender>
<logger name="org.apache.qpid">
@@ -46,6 +46,10 @@
<logger name="org.apache.qpid.test.utils.QpidTestCase">
<level value="ALL"/>
</logger>
+
+ <logger name="org.apache.commons">
+ <level value="WARN"/>
+ </logger>
<root>
<level value="${root.logging.level}"/>
diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties
index e5cb18da0b..8cea012c1d 100644
--- a/qpid/java/test-profiles/test-provider.properties
+++ b/qpid/java/test-profiles/test-provider.properties
@@ -19,20 +19,23 @@
#
#
-test.port=5672
-test.port.ssl=5671
-test.port.alt=5772
-test.port.alt.ssl=5771
+# Copied from default.testprofile
+test.port=15672
+test.mport=18999
+#Note : Java Management will start open second port on: mport + 100 : 19099
+test.port.ssl=15671
+test.port.alt=25672
+test.port.alt.ssl=25671
+
connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
+connectionfactory.default.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
-connectionfactory.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
+connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20''
-connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'
connectionfactory.connection1.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'