summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-05 13:29:44 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-05 13:29:44 +0000
commitfd4934ce9155cb52315a46de4cf97d71ee1cefa1 (patch)
treea53e4b8145293c2ab71cfc01a208b28e8c809c4c
parentf67026b148b3429ca9731bce94a0bd3ff2981ce5 (diff)
downloadqpid-python-fd4934ce9155cb52315a46de4cf97d71ee1cefa1.tar.gz
Merged from trunk up to r820933
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@821793 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java110
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java104
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java204
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java36
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java5
-rw-r--r--qpid/java/client/src/main/java/log4j.xml36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java105
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java13
-rwxr-xr-xqpid/java/common/bin/qpid-run1
-rw-r--r--qpid/java/module.xml2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java324
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java170
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java124
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java173
-rwxr-xr-x[-rw-r--r--]qpid/java/test-profiles/010Excludes7
-rw-r--r--qpid/java/test-profiles/08Excludes2
-rw-r--r--qpid/java/test-profiles/08StandaloneExcludes2
-rw-r--r--qpid/java/test-profiles/default.testprofile3
-rw-r--r--qpid/java/test-profiles/log4j-test.xml6
-rw-r--r--qpid/java/test-profiles/test-provider.properties12
45 files changed, 1325 insertions, 273 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index c4c73842bc..4f24f7ddc0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -27,13 +27,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -42,12 +41,8 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -60,6 +55,7 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
@@ -120,6 +116,11 @@ public class AMQChannel
private final AMQProtocolSession _session;
private boolean _closing;
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+
+
private LogActor _actor;
private LogSubject _logSubject;
@@ -798,16 +799,32 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
-
+ /**
+ * Called from the ChannelFlowHandler to suspend this Channel
+ * @param suspended boolean, should this Channel be suspended
+ */
public void setSuspended(boolean suspended)
{
-
-
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
- _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+ // Log Flow Started before we start the subscriptions
+ if (!suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
+ }
+
+
+ // This section takes two different approaches to perform to perform
+ // the same function. Ensuring that the Subscription has taken note
+ // of the change in Channel State
+ // Here we have become unsuspended and so we ask each the queue to
+ // perform an Async delivery for each of the subscriptions in this
+ // Channel. The alternative would be to ensure that the subscription
+ // had received the change in suspension state. That way the logic
+ // behind decieding to start an async delivery was located with the
+ // Subscription.
if (wasSuspended)
{
// may need to deliver queued messages
@@ -816,6 +833,38 @@ public class AMQChannel
s.getQueue().deliverAsync(s);
}
}
+
+
+ // Here we have become suspended so we need to ensure that each of
+ // the Subscriptions has noticed this change so that we can be sure
+ // they are not still sending messages. Again the code here is a
+ // very simplistic approach to ensure that the change of suspension
+ // has been noticed by each of the Subscriptions. Unlike the above
+ // case we don't actually need to do anything else.
+ if (!wasSuspended)
+ {
+ // may need to deliver queued messages
+ for (Subscription s : _tag2SubscriptionMap.values())
+ {
+ try
+ {
+ s.getSendLock();
+ }
+ finally
+ {
+ s.releaseSendLock();
+ }
+ }
+ }
+
+
+ // Log Suspension only after we have confirmed all suspensions are
+ // stopped.
+ if (suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Stopped"));
+ }
+
}
}
@@ -967,4 +1016,37 @@ public class AMQChannel
{
return _actor;
}
+
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString()));
+ flow(false);
+ }
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue))
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1006());
+
+ flow(true);
+ }
+ }
+ }
+
+ private void flow(boolean flow)
+ {
+ MethodRegistry methodRegistry = _session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
+ _session.writeFrame(responseBody.generateFrame(_channelId));
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 74bb7ee969..5c73e353de 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -108,4 +108,14 @@ public class QueueConfiguration
return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap());
}
+ public long getCapacity()
+ {
+ return _config.getLong("capacity", _vHostConfig.getCapacity());
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index ed9d8acc08..01befbbfe8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -103,6 +103,8 @@ public class ServerConfiguration implements SignalHandler
envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth");
envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize");
envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap");
+ envVarMap.put("QPID_QUEUECAPACITY", "capacity");
+ envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity");
envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer");
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
@@ -289,7 +291,6 @@ public class ServerConfiguration implements SignalHandler
return conf;
}
- @Override
public void handle(Signal arg0)
{
try
@@ -507,6 +508,16 @@ public class ServerConfiguration implements SignalHandler
return getConfig().getLong("minimumAlertRepeatGap", 0);
}
+ public long getCapacity()
+ {
+ return getConfig().getLong("capacity", 0L);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return getConfig().getLong("flowResumeCapacity", getCapacity());
+ }
+
public int getProcessors()
{
return getConfig().getInt("connector.processors", 4);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 0273a13262..6c72025ec2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -166,4 +166,15 @@ public class VirtualHostConfiguration
return _config.getLong("queues.minimumAlertRepeatGap", 0);
}
+
+ public long getCapacity()
+ {
+ return _config.getLong("queues.capacity", 0l);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("queues.flowResumeCapacity", getCapacity());
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
index 4502710dd6..0059a48c06 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
@@ -60,4 +60,9 @@ public abstract class AbstractActor implements LogActor
return _rootLogger;
}
+ public String toString()
+ {
+ return _logString;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
index 9b928accc0..5d2762fd1d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
@@ -36,4 +36,12 @@ public class BrokerActor extends AbstractActor
_logString = "[Broker] ";
}
+
+ public BrokerActor(String name, RootMessageLogger logger)
+ {
+ super(logger);
+
+ _logString = "[Broker(" + name + ")] ";
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index 5ced7cc0b9..9169a1a651 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -249,12 +249,16 @@ CHN-1003 = Close
# 0 - bytes allowed in prefetch
# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
#Queue
# 0 - owner
# 1 - priority
QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
#Exchange
# 0 - type
@@ -269,4 +273,4 @@ BND-1002 = Deleted
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
-SUB-1003 = State : {0} \ No newline at end of file
+SUB-1003 = State : {0}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index b5f6711c93..7daa10578f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.PrincipalHolder;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -170,6 +171,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void setMinimumAlertRepeatGap(long value);
+ long getCapacity();
+
+ void setCapacity(long capacity);
+
+
+ long getFlowResumeCapacity();
+
+ void setFlowResumeCapacity(long flowResumeCapacity);
+
+
+
void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
long clearQueue(StoreContext storeContext) throws AMQException;
@@ -194,6 +206,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
Map<String, Object> getArguments();
+ void checkCapacity(AMQChannel channel);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 71831efc7b..71ad05ce25 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -27,12 +27,104 @@ import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
+import java.util.HashMap;
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ private abstract static class QueueProperty
+ {
+
+ private final AMQShortString _argumentName;
+
+
+ public QueueProperty(String argumentName)
+ {
+ _argumentName = new AMQShortString(argumentName);
+ }
+
+ public AMQShortString getArgumentName()
+ {
+ return _argumentName;
+ }
+
+
+ public abstract void setPropertyValue(AMQQueue queue, Object value);
+
+ }
+
+ private abstract static class QueueLongProperty extends QueueProperty
+ {
+
+ public QueueLongProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).longValue());
+ }
+
+ }
+
+ abstract void setPropertyValue(AMQQueue queue, long value);
+
+
+ }
+
+ private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
+ new QueueLongProperty("x-qpid-maximum-message-age")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageAge(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-size")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageSize(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-count")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageCount(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMinimumAlertRepeatGap(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setCapacity(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-flow-resume-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setFlowResumeCapacity(value);
+ }
+ }
+
+ };
+
+
+
public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
@@ -55,6 +147,18 @@ public class AMQQueueFactory
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+
+ if(arguments != null)
+ {
+ for(QueueProperty p : DECLAREABLE_PROPERTIES)
+ {
+ if(arguments.containsKey(p.getArgumentName()))
+ {
+ p.setPropertyValue(q, arguments.get(p.getArgumentName()));
+ }
+ }
+ }
+
return q;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 67bc87145a..4f43695b13 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -46,7 +46,6 @@ public class QueueEntryImpl implements QueueEntry
private MessageReference _message;
-
private Set<Subscription> _rejectedBy = null;
private volatile EntryState _state = AVAILABLE_STATE;
@@ -198,7 +197,6 @@ public class QueueEntryImpl implements QueueEntry
_stateUpdater.set(this,AVAILABLE_STATE);
}
-
public boolean immediateAndNotDelivered()
{
return getMessage().isImmediate() && !_deliveredToConsumer;
@@ -419,4 +417,5 @@ public class QueueEntryImpl implements QueueEntry
{
return _queueEntryList;
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b040993d0b..ab5972b26f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -4,6 +4,8 @@ import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.management.JMException;
@@ -30,6 +32,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.AMQChannel;
/*
*
@@ -123,6 +126,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final Executor _asyncDelivery;
private final AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
/** max allowed size(KB) of a single message */
public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -149,6 +154,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private LogSubject _logSubject;
private LogActor _logActor;
+
+ private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+ private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -575,6 +584,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_deliveredMessages.incrementAndGet();
sub.send(entry);
+
setLastSeenEntry(sub,entry);
}
@@ -684,6 +694,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new FailedDequeueException(_name.toString(), e);
}
+ checkCapacity();
+
}
private void decrementQueueSize(final QueueEntry entry)
@@ -1233,11 +1245,61 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- public void deliverAsync()
+ public void checkCapacity(AMQChannel channel)
+ {
+ if(_capacity != 0l)
+ {
+ if(_atomicQueueSize.get() > _capacity)
+ {
+ //Overfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
+
+ if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
+ {
+ channel.block(this);
+ }
+
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+ channel.unblock(this);
+ _blockedChannels.remove(channel);
+
+ }
+
+ }
+
+
+
+ }
+ }
+
+ private void checkCapacity()
{
- _stateChangeCount.incrementAndGet();
+ if(_capacity != 0L)
+ {
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
- Runner runner = new Runner();
+ for(AMQChannel c : _blockedChannels.keySet())
+ {
+ c.unblock(this);
+ _blockedChannels.remove(c);
+ }
+ }
+ }
+ }
+
+
+ public void deliverAsync()
+ {
+ Runner runner = new Runner(_stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1250,13 +1312,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(new SubFlushRunner(sub));
}
+
private class Runner implements ReadWriteRunnable
{
+ String _name;
+ public Runner(long count)
+ {
+ _name = "QueueRunner-" + count + "-" + _logActor;
+ }
+
public void run()
{
+ String originalName = Thread.currentThread().getName();
try
{
+ Thread.currentThread().setName(_name);
CurrentActor.set(_logActor);
+
processQueue(this);
}
catch (AMQException e)
@@ -1266,9 +1338,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
finally
{
CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
}
-
-
}
public boolean isRead()
@@ -1280,6 +1351,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return true;
}
+
+ public String toString()
+ {
+ return _name;
+ }
}
private class SubFlushRunner implements ReadWriteRunnable
@@ -1293,27 +1369,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void run()
{
- boolean complete = false;
- try
- {
- CurrentActor.set(_sub.getLogActor());
- complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
- }
- catch (AMQException e)
- {
- _logger.error(e);
+ String originalName = Thread.currentThread().getName();
+ try{
+ Thread.currentThread().setName("SubFlushRunner-"+_sub);
+
+ boolean complete = false;
+ try
+ {
+ CurrentActor.set(_sub.getLogActor());
+ complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ if (!complete && !_sub.isSuspended())
+ {
+ _asyncDelivery.execute(this);
+ }
}
finally
{
- CurrentActor.remove();
- }
- if (!complete && !_sub.isSuspended())
- {
- _asyncDelivery.execute(this);
+ Thread.currentThread().setName(originalName);
}
-
}
public boolean isRead()
@@ -1341,7 +1426,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
sub.getSendLock();
- atTail = attemptDelivery(sub);
+ atTail = attemptDelivery(sub);
if (atTail && sub.isAutoClose())
{
unregisterSubscription(sub);
@@ -1371,44 +1456,48 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return atTail;
}
+ /**
+ * Attempt delivery for the given subscription.
+ *
+ * Looks up the next node for the subscription and attempts to deliver it.
+ *
+ * @param sub
+ * @return true if we have completed all possible deliveries for this sub.
+ * @throws AMQException
+ */
private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
- boolean subActive = sub.isActive();
+ boolean subActive = sub.isActive() && !sub.isSuspended();
if (subActive)
{
-
QueueEntry node = getNextAvailableEntry(sub);
if (node != null && !(node.isAcquired() || node.isDeleted()))
{
- if (!sub.isSuspended())
+ if (sub.hasInterest(node))
{
- if (sub.hasInterest(node))
+ if (!sub.wouldSuspend(node))
{
- if (!sub.wouldSuspend(node))
+ if (sub.acquires() && !node.acquire(sub))
{
- if (sub.acquires() && !node.acquire(sub))
- {
- sub.onDequeue(node);
- }
- else
- {
- deliverMessage(sub, node);
- }
-
+ sub.onDequeue(node);
}
- else // Not enough Credit for message and wouldSuspend
+ else
{
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
- node.addStateChangeListener(new QueueEntryListener(sub, node));
+ deliverMessage(sub, node);
}
- }
+ }
+ else // Not enough Credit for message and wouldSuspend
+ {
+ //QPID-1187 - Treat the subscription as suspended for this message
+ // and wait for the message to be removed to continue delivery.
+ subActive = false;
+ node.addStateChangeListener(new QueueEntryListener(sub, node));
+ }
}
}
@@ -1488,6 +1577,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asynchronousRunner.compareAndSet(runner, null);
+ // For every message enqueue/requeue the we fire deliveryAsync() which
+ // increases _stateChangeCount. If _sCC changes whilst we are in our loop
+ // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
+ // then we will continue to run for a maximum of iterations.
+ // So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
@@ -1507,7 +1601,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//iterate over the subscribers and try to advance their pointer
while (subscriptionIter.advance())
{
- boolean closeConsumer = false;
Subscription sub = subscriptionIter.getNode().getSubscription();
sub.getSendLock();
try
@@ -1551,6 +1644,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rescheduling runner:" + runner);
+ }
_asyncDelivery.execute(runner);
}
}
@@ -1663,6 +1760,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public long getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(long capacity)
+ {
+ _capacity = capacity;
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _flowResumeCapacity;
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ _flowResumeCapacity = flowResumeCapacity;
+ }
+
+
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
@@ -1732,6 +1850,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setMaximumMessageSize(config.getMaximumMessageSize());
setMaximumMessageCount(config.getMaximumMessageCount());
setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index b6137e83de..7511b5c818 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -102,7 +102,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
- instance.initialise();
+ instance.initialise(instanceID);
}
catch (Exception e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index a049d1eb09..831f928832 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -42,18 +42,22 @@ import java.io.File;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
+ private String _registryName;
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
super(new ServerConfiguration(configurationURL));
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
_rootMessageLogger = new RootMessageLoggerImpl(_configuration,
new Log4jMessageLogger());
+
+ _registryName = String.valueOf(instanceID);
+
// Set the Actor for current log messages
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
+ CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));
@@ -83,7 +87,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void close() throws Exception
{
//Set the Actor for Broker Shutdown
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
+ CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
try
{
super.close();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index ddb3fce5d2..92accb3499 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -43,8 +43,9 @@ public interface IApplicationRegistry
* Initialise the application registry. All initialisation must be done in this method so that any components
* that need access to the application registry itself for initialisation are able to use it. Attempting to
* initialise in the constructor will lead to failures since the registry reference will not have been set.
+ * @param instanceID the instanceID that we can use to identify this AR.
*/
- void initialise() throws Exception;
+ void initialise(int instanceID) throws Exception;
/**
* Shutdown this Registry
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index ebec6af9f0..27a6f14b03 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -99,6 +99,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
StoreContext.setCurrentContext(getStoreContext());
QueueEntry entry = _queue.enqueue(_message);
+ _queue.checkCapacity(_channel);
if(entry.immediateAndNotDelivered())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 76cb7a397d..dd461bad47 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -96,6 +96,7 @@ public class NonTransactionalContext implements TransactionalContext
StoreContext.clearCurrentContext();
+ queue.checkCapacity(_channel);
//following check implements the functionality
//required by the 'immediate' flag:
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 8e1a6d2348..1d8400f736 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
@@ -280,6 +281,15 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean getBlockOnQueueFull()
+ {
+ return false;
+ }
+
+ public void setBlockOnQueueFull(boolean block)
+ {
+ }
+
public long getMinimumAlertRepeatGap()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
@@ -295,7 +305,6 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
-
public void checkMessageStatus() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -336,6 +345,10 @@ public class MockAMQQueue implements AMQQueue
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void checkCapacity(AMQChannel channel)
+ {
+ }
+
public ManagedObject getManagedObject()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -346,12 +359,31 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
-
public void setMinimumAlertRepeatGap(long value)
{
}
+ public long getCapacity()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setCapacity(long capacity)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void configure(QueueConfiguration config)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
index e75ed640aa..4c8ff01be0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.registry;
import junit.framework.TestCase;
import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.AMQException;
import java.security.Security;
import java.security.Provider;
@@ -69,7 +68,7 @@ public class ApplicationRegistryShutdownTest extends TestCase
// Register new providers
try
{
- _registry.initialise();
+ _registry.initialise(ApplicationRegistry.DEFAULT_INSTANCE);
}
catch (Exception e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 8fef8baa02..705e7d2ad7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -50,7 +50,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
super(new ServerConfiguration(new PropertiesConfiguration()));
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
_logger.info("Initialising NullApplicationRegistry");
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 43948c05c4..7b7c86bb80 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.util;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -31,7 +30,6 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
-import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
@@ -45,7 +43,6 @@ import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Properties;
import java.util.Arrays;
@@ -75,7 +72,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
_config = config;
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
_rootMessageLogger = new RootMessageLoggerImpl(_configuration,
new Log4jMessageLogger());
diff --git a/qpid/java/client/src/main/java/log4j.xml b/qpid/java/client/src/main/java/log4j.xml
new file mode 100644
index 0000000000..c27acba818
--- /dev/null
+++ b/qpid/java/client/src/main/java/log4j.xml
@@ -0,0 +1,36 @@
+<!--
+
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+-->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-5p %c{1} - %m%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.apache.qpid">
+ <level value="warn"/>
+ <appender-ref ref="console" />
+ </logger>
+
+</log4j:configuration>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0d2adcec8a..5659ce0474 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -450,7 +450,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
- // use the defaul value set for all connections
+ // use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
@@ -512,7 +512,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
boolean retryAllowed = true;
Exception connectionException = null;
- while (!_connected && retryAllowed)
+ while (!_connected && retryAllowed && brokerDetails != null)
{
ProtocolVersion pe = null;
try
@@ -691,12 +691,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public boolean attemptReconnection()
{
- while (_failoverPolicy.failoverAllowed())
+ BrokerDetails broker = null;
+ while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
{
try
{
- makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
-
+ makeBrokerConnection(broker);
return true;
}
catch (Exception e)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2e3e417c95..fa15df34ec 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -91,6 +91,7 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
+import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,6 +115,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
+
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -198,16 +200,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
- protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+ protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
- protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+ protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
protected final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
@@ -778,7 +796,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
@@ -1559,6 +1577,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
suspendChannel(true);
}
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+
+ syncDispatchQueue();
+
+ _dispatcher.rollback();
+
releaseForRollback();
sendRollback();
@@ -1851,26 +1877,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void failoverPrep()
{
- startDispatcherIfNecessary();
syncDispatchQueue();
}
void syncDispatchQueue()
{
- final CountDownLatch signal = new CountDownLatch(1);
- _queue.add(new Dispatchable() {
- public void dispatch(AMQSession ssn)
+ if (Thread.currentThread() == _dispatcherThread)
+ {
+ while (!_closed.get() && !_queue.isEmpty())
{
- signal.countDown();
+ Dispatchable disp;
+ try
+ {
+ disp = (Dispatchable) _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // Check just in case _queue becomes empty, it shouldn't but
+ // better than an NPE.
+ if (disp == null)
+ {
+ _logger.debug("_queue became empty during sync.");
+ break;
+ }
+
+ disp.dispatch(AMQSession.this);
}
- });
- try
- {
- signal.await();
}
- catch (InterruptedException e)
+ else
{
- throw new RuntimeException(e);
+ startDispatcherIfNecessary();
+
+ final CountDownLatch signal = new CountDownLatch(1);
+
+ _queue.add(new Dispatchable()
+ {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
@@ -2233,7 +2291,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, false);
+ return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
}
private P createProducerImpl(final Destination destination, final boolean mandatory,
@@ -2704,15 +2762,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
+ _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
}
- public void checkFlowControl() throws InterruptedException
+ public void checkFlowControl() throws InterruptedException, JMSException
{
+ long expiryTime = 0L;
synchronized (_flowControl)
{
- while (!_flowControl.getFlowControl())
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+ : expiryTime) >= System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
+ _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
+ if(!_flowControl.getFlowControl())
{
- _flowControl.wait();
+ _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+ throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 0644bd88a8..1587d6a6bf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void releaseForRollback()
{
- startDispatcherIfNecessary();
- syncDispatchQueue();
- _dispatcher.rollback();
getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
_txRangeSet.clear();
_txSize = 0;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index d7196c0abb..bc1453beaf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -195,6 +195,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public void releaseForRollback()
{
+ // Reject all the messages that have been received in this session and
+ // have not yet been acknowledged. Should look to remove
+ // _deliveredMessageTags and use _txRangeSet as used by 0-10.
+ // Otherwise messages will be able to arrive out of order to a second
+ // consumer on the queue. Whilst this is within the JMS spec it is not
+ // user friendly and avoidable.
while (true)
{
Long tag = _deliveredMessageTags.poll();
@@ -205,11 +211,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
rejectMessage(tag, true);
}
-
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
}
public void rejectMessage(long deliveryTag, boolean requeue)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 5ff6066ddc..44ce59975a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -60,7 +60,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
/**
* Priority of messages created by this producer.
*/
- private int _messagePriority;
+ private int _messagePriority = Message.DEFAULT_PRIORITY;
/**
* Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index 9c791730ca..0e3333940a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance();
private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
@@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- return false;
+ _channelFlowMethodHandler.methodReceived(_session, body, channelId);
+ return true;
}
public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
index e05a7ab6e2..960661daea 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
@@ -189,7 +189,8 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener
{
synchronized (_brokerListLock)
{
- return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ return _currentBrokerDetail;
}
}
@@ -214,7 +215,15 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener
broker.getHost().equals(_currentBrokerDetail.getHost()) &&
broker.getPort() == _currentBrokerDetail.getPort())
{
- return getNextBrokerDetails();
+ if (_connectionDetails.getBrokerCount() > 1)
+ {
+ return getNextBrokerDetails();
+ }
+ else
+ {
+ _failedAttemps ++;
+ return null;
+ }
}
String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
diff --git a/qpid/java/common/bin/qpid-run b/qpid/java/common/bin/qpid-run
index 0b5070d937..63bb648fd8 100755
--- a/qpid/java/common/bin/qpid-run
+++ b/qpid/java/common/bin/qpid-run
@@ -111,6 +111,7 @@ if [ -n "$QPID_LOG_SUFFIX" ]; then
fi
log $INFO System Properties set to $SYSTEM_PROPS
+log $INFO QPID_OPTS set to $QPID_OPTS
program=$(basename $0)
sourced=${BASH_SOURCE[0]}
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 0c32414647..5796af928a 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -261,6 +261,7 @@
<jvmarg value="${jvm.args}"/>
<sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
+ <sysproperty key="amqj.server.logging.level" value="${amqj.server.logging.level}"/>
<sysproperty key="amqj.protocol.logging.level" value="${amqj.protocol.logging.level}"/>
<sysproperty key="log4j.debug" value="${log4j.debug}"/>
<sysproperty key="root.logging.level" value="${root.logging.level}"/>
@@ -269,6 +270,7 @@
<sysproperty key="java.naming.provider.url" value="${java.naming.provider.url}"/>
<sysproperty key="broker" value="${broker}"/>
<sysproperty key="broker.clean" value="${broker.clean}"/>
+ <sysproperty key="broker.clean.between.tests" value="${broker.clean.between.tests}"/>
<sysproperty key="broker.version" value="${broker.version}"/>
<sysproperty key="broker.ready" value="${broker.ready}" />
<sysproperty key="broker.stopped" value="${broker.stopped}" />
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
index 52120019f5..e7975f8d24 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
@@ -78,15 +78,23 @@ public class BrokerStartupTest extends AbstractTestLogging
// Add an invalid value
_broker += " -l invalid";
- // The release-bin build of the broker uses this log4j configuration
- // so set up the broker environment to use it for this test.
- // Also include -Dlog4j.debug so we can validate that it picked up this config
- setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug -Dlog4j.configuration=file:" + System.getProperty(QPID_HOME) + "/../broker/src/main/java/log4j.properties");
+ // The broker has a built in default log4j configuration set up
+ // so if the the broker cannot load the -l value it will use default
+ // use this default. Test that this is correctly loaded, by
+ // including -Dlog4j.debug so we can validate.
+ setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug");
// Disable all client logging so we can test for broker DEBUG only.
- Logger.getRootLogger().setLevel(Level.WARN);
- Logger.getLogger("qpid.protocol").setLevel(Level.WARN);
- Logger.getLogger("org.apache.qpid").setLevel(Level.WARN);
+ setLoggerLevel(Logger.getRootLogger(), Level.WARN);
+ setLoggerLevel(Logger.getLogger("qpid.protocol"), Level.WARN);
+ setLoggerLevel(Logger.getLogger("org.apache.qpid"), Level.WARN);
+
+ // Set the broker to use info level logging, which is the qpid-server
+ // default. Rather than debug which is the test default.
+ setBrokerOnlySystemProperty("amqj.server.logging.level", "info");
+ // Set the logging defaults to info for this test.
+ setBrokerOnlySystemProperty("amqj.logging.level", "info");
+ setBrokerOnlySystemProperty("root.logging.level", "info");
startBroker();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
index 4f50aba61d..b00a71315e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
@@ -160,7 +160,7 @@ public class BrokerLoggingTest extends AbstractTestLogging
// Set the broker.ready string to check for the _log4j default that
// is still present on standard out.
- System.setProperty(BROKER_READY, "Qpid Broker Ready");
+ setTestClientSystemProperty(BROKER_READY, "Qpid Broker Ready");
startBroker();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
new file mode 100644
index 0000000000..97147904e1
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -0,0 +1,324 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.*;
+import javax.naming.NamingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ProducerFlowControlTest extends QpidTestCase
+{
+ private static final int TIMEOUT = 1500;
+
+
+ private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
+
+ protected final String QUEUE = "ProducerFlowControl";
+
+ private static final int MSG_COUNT = 50;
+
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+ private Session consumerSession;
+
+
+ private MessageConsumer consumer;
+ private final AtomicInteger _sentMessages = new AtomicInteger();
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ producerConnection.start();
+
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ producerConnection.close();
+ consumerConnection.close();
+ super.tearDown();
+ }
+
+ public void testCapacityExceededCausesBlock()
+ throws JMSException, NamingException, AMQException, InterruptedException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get());
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message not sent after two messages received", 5, _sentMessages.get());
+
+ }
+
+
+ public void testFlowControlOnCapacityResumeEqual()
+ throws JMSException, NamingException, AMQException, InterruptedException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",1000);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get());
+
+
+ }
+
+
+ public void testFlowControlSoak()
+ throws Exception, NamingException, AMQException, InterruptedException
+ {
+ _sentMessages.set(0);
+ final int numProducers = 10;
+ final int numMessages = 100;
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",6000);
+ arguments.put("x-qpid-flow-resume-capacity",3000);
+
+ ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
+
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
+ consumerConnection.start();
+
+ Connection[] producers = new Connection[numProducers];
+ for(int i = 0 ; i < numProducers; i ++)
+ {
+
+ producers[i] = getConnection();
+ producers[i].start();
+ Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer myproducer = session.createProducer(queue);
+ MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L);
+ }
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ for(int j = 0; j < numProducers * numMessages; j++)
+ {
+
+ Message msg = consumer.receive(5000);
+ Thread.sleep(50L);
+ assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg);
+
+ }
+
+
+
+ Message msg = consumer.receive(500);
+ assertNull("extra message received", msg);
+
+
+ for(int i = 0; i < numProducers; i++)
+ {
+ producers[i].close();
+ }
+
+ }
+
+
+
+ public void testSendTimeout()
+ throws JMSException, NamingException, AMQException, InterruptedException
+ {
+ long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+ System.setProperty("qpid.flow_control_wait_failure","3000");
+ Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ producer = session.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(10000);
+
+ Exception e = sender.getException();
+
+ assertNotNull("No timeout exception on sending", e);
+
+ System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue));
+
+
+
+ }
+
+ private MessageSender sendMessagesAsync(final MessageProducer producer,
+ final Session producerSession,
+ final int numMessages,
+ long sleepPeriod)
+ {
+ MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod);
+ new Thread(sender).start();
+ return sender;
+ }
+
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ throws JMSException
+ {
+
+ for (int msg = 0; msg < numMessages; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ _sentMessages.incrementAndGet();
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+
+ private static final byte[] BYTE_300 = new byte[300];
+
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_300);
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+
+
+ private class MessageSender implements Runnable
+ {
+ private final MessageProducer _producer;
+ private final Session _producerSession;
+ private final int _numMessages;
+
+
+
+ private JMSException _exception;
+ private long _sleepPeriod;
+
+ public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ {
+ _producer = producer;
+ _producerSession = producerSession;
+ _numMessages = numMessages;
+ _sleepPeriod = sleepPeriod;
+ }
+
+ public void run()
+ {
+ try
+ {
+ sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod);
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
+ }
+ }
+
+ public JMSException getException()
+ {
+ return _exception;
+ }
+ }
+} \ No newline at end of file
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 62b54d3086..2e625f95c0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -61,7 +61,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
setupSession();
- _queue = _clientSession.createQueue(getName()+System.currentTimeMillis());
+ _queue = _clientSession.createQueue(getTestQueueName());
_clientSession.createConsumer(_queue).close();
//Ensure there are no messages on the queue to start with.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
index 39e2b892a9..ff766c907d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
@@ -22,64 +22,172 @@ package org.apache.qpid.test.client;
import org.apache.qpid.test.utils.*;
import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.ComparisonFailure;
+import junit.framework.AssertionFailedError;
/**
- * RollbackOrderTest
+ * RollbackOrderTest, QPID-1864, QPID-1871
+ *
+ * Description:
+ *
+ * The problem that this test is exposing is that the dispatcher used to be capable
+ * of holding on to a message when stopped. This ment that when the rollback was
+ * called and the dispatcher stopped it may have hold of a message. So after all
+ * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
+ * have been cleared the client still had a single message, the one the
+ * dispatcher was holding on to.
+ *
+ * As a result the TxRollback operation would run and then release the dispatcher.
+ * Whilst the dispatcher would then proceed to reject the message it was holiding
+ * the Broker would already have resent that message so the rejection would silently
+ * fail.
+ *
+ * And the client would receieve that single message 'early', depending on the
+ * number of messages already recevied when rollback was called.
+ *
+ *
+ * Aims:
+ *
+ * The tests puts 50 messages on to the queue.
+ *
+ * The test then tries to cause the dispatcher to stop whilst it is in the process
+ * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
+ *
+ * To exercise this path we have 50 message flowing to the client to give the
+ * dispatcher a bit of work to do moving messages.
+ *
+ * Then we loop - 10 times
+ * - Validating that the first message received is always message 1.
+ * - Receive a few more so that there are a few messages to reject.
+ * - call rollback, to try and catch the dispatcher mid process.
+ *
+ * Outcome:
+ *
+ * The hope is that we catch the dispatcher mid process and cause a BasicReject
+ * to fail. Which will be indicated in the log but will also cause that failed
+ * rejected message to be the next to be delivered which will not be message 1
+ * as expected.
+ *
+ * We are testing a race condition here but we can check through the log file if
+ * the race condition occured. However, performing that check will only validate
+ * the problem exists and will not be suitable as part of a system test.
*
*/
-
public class RollbackOrderTest extends QpidTestCase
{
- private Connection conn;
- private Queue queue;
- private Session ssn;
- private MessageProducer prod;
- private MessageConsumer cons;
+ private Connection _connection;
+ private Queue _queue;
+ private Session _session;
+ private MessageConsumer _consumer;
@Override public void setUp() throws Exception
{
super.setUp();
- conn = getConnection();
- conn.start();
- ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- queue = ssn.createQueue("rollback-order-test-queue");
- prod = ssn.createProducer(queue);
- cons = ssn.createConsumer(queue);
- for (int i = 0; i < 5; i++)
- {
- TextMessage msg = ssn.createTextMessage("message " + (i+1));
- prod.send(msg);
- }
- ssn.commit();
+ _connection = getConnection();
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue(getTestQueueName());
+ _consumer = _session.createConsumer(_queue);
+
+ //Send more messages so it is more likely that the dispatcher is
+ // processing on rollback.
+ sendMessage(_session, _queue, 50);
+ _session.commit();
+
}
public void testOrderingAfterRollback() throws Exception
{
- for (int i = 0; i < 10; i++)
+ //Start the session now so we
+ _connection.start();
+
+ for (int i = 0; i < 20; i++)
{
- TextMessage msg = (TextMessage) cons.receive();
- assertEquals("message 1", msg.getText());
- ssn.rollback();
+ Message msg = _consumer.receive();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ // Pull additional messages through so we have some reject work to do
+ for (int m=0; m < 5 ; m++)
+ {
+ _consumer.receive();
+ }
+
+ System.err.println("ROT-Rollback");
+ _logger.warn("ROT-Rollback");
+ _session.rollback();
}
}
- @Override public void tearDown() throws Exception
+ public void testOrderingAfterRollbackOnMessage() throws Exception
{
- while (true)
+ final CountDownLatch count= new CountDownLatch(20);
+ final Exception exceptions[] = new Exception[20];
+ final AtomicBoolean failed = new AtomicBoolean(false);
+
+ _consumer.setMessageListener(new MessageListener()
{
- Message msg = cons.receiveNoWait();
- if (msg == null)
+
+ public void onMessage(Message message)
{
- break;
+
+ Message msg = message;
+ try
+ {
+ count.countDown();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ _session.rollback();
+ }
+ catch (JMSException e)
+ {
+ System.out.println("Error:" + e.getMessage());
+ exceptions[(int)count.getCount()] = e;
+ }
+ catch (AssertionFailedError cf)
+ {
+ // End Test if Equality test fails
+ while (count.getCount() != 0)
+ {
+ count.countDown();
+ }
+
+ System.out.println("Error:" + cf.getMessage());
+ System.err.println(cf.getMessage());
+ cf.printStackTrace();
+ failed.set(true);
+ }
}
- else
+ });
+ //Start the session now so we
+ _connection.start();
+
+ count.await();
+
+ for (Exception e : exceptions)
+ {
+ if (e != null)
{
- msg.acknowledge();
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ failed.set(true);
}
}
- ssn.commit();
+
+// _consumer.close();
+ _connection.close();
+
+ assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
+ }
+
+ @Override public void tearDown() throws Exception
+ {
+
+ drainQueue(_queue);
+
super.tearDown();
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
index d76d3858b3..1ec39bd1e0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -22,9 +22,7 @@ package org.apache.qpid.test.client.message;
import java.util.concurrent.CountDownLatch;
-import javax.jms.Connection;
import javax.jms.DeliveryMode;
-import javax.jms.Destination;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -35,15 +33,12 @@ import javax.jms.Session;
import junit.framework.Assert;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +48,6 @@ public class SelectorTest extends QpidTestCase implements MessageListener
private AMQConnection _connection;
private AMQDestination _destination;
- private AMQSession _session;
private int count;
public String _connectionString = "vm://:1";
private static final String INVALID_SELECTOR = "Cost LIKE 5";
@@ -66,40 +60,12 @@ public class SelectorTest extends QpidTestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- BasicConfigurator.configure();
init((AMQConnection) getConnection("guest", "guest"));
}
- protected void tearDown() throws Exception
- {
- super.tearDown();
- if (_session != null)
- {
- try
- {
- _session.close();
- }
- catch (JMSException e)
- {
- fail("Error cleaning up:" + e.getMessage());
- }
- }
- if (_connection != null)
- {
- try
- {
- _connection.close();
- }
- catch (JMSException e)
- {
- fail("Error cleaning up:" + e.getMessage());
- }
- }
- }
-
private void init(AMQConnection connection) throws JMSException
{
- init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true));
+ init(connection, new AMQQueue(connection, getTestQueueName(), true));
}
private void init(AMQConnection connection, AMQDestination destination) throws JMSException
@@ -107,23 +73,27 @@ public class SelectorTest extends QpidTestCase implements MessageListener
_connection = connection;
_destination = destination;
connection.start();
+ }
+
+ public void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ _responseLatch.countDown();
+ }
- String selector = null;
- selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
+ public void testUsingOnMessage() throws Exception
+ {
+ String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
- _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
// _session.createConsumer(destination).setMessageListener(this);
- _session.createConsumer(destination, selector).setMessageListener(this);
- }
+ session.createConsumer(_destination, selector).setMessageListener(this);
- public void test() throws Exception
- {
try
{
- init((AMQConnection) getConnection("guest", "guest", randomize("Client")));
-
- Message msg = _session.createTextMessage("Message");
+ Message msg = session.createTextMessage("Message");
msg.setJMSPriority(1);
msg.setIntProperty("Cost", 2);
msg.setStringProperty("property-with-hyphen", "wibble");
@@ -131,7 +101,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener
_logger.info("Sending Message:" + msg);
- ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ ((BasicMessageProducer) session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
_logger.info("Message sent, waiting for response...");
_responseLatch.await();
@@ -163,40 +133,18 @@ public class SelectorTest extends QpidTestCase implements MessageListener
{
_logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
}
- catch (URLSyntaxException e)
- {
- _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage());
- fail("Wrong exception");
- }
- catch (AMQException e)
- {
- _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage());
- fail("Wrong exception");
- }
- finally
- {
- if (_session != null)
- {
- _session.close();
- }
- if (_connection != null)
- {
- _connection.close();
- }
- }
}
public void testUnparsableSelectors() throws Exception
{
- Connection connection = getConnection("guest", "guest", randomize("Client"));
- _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ AMQSession session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
boolean caught = false;
//Try Creating a Browser
try
{
- _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR);
+ session.createBrowser(session.createQueue("Ping"), INVALID_SELECTOR);
}
catch (JMSException e)
{
@@ -213,7 +161,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener
//Try Creating a Consumer
try
{
- _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR);
+ session.createConsumer(session.createQueue("Ping"), INVALID_SELECTOR);
}
catch (JMSException e)
{
@@ -230,7 +178,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener
//Try Creating a Receiever
try
{
- _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR);
+ session.createReceiver(session.createQueue("Ping"), INVALID_SELECTOR);
}
catch (JMSException e)
{
@@ -246,7 +194,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener
try
{
- _session.createReceiver(_session.createQueue("Ping"), BAD_MATHS_SELECTOR);
+ session.createReceiver(session.createQueue("Ping"), BAD_MATHS_SELECTOR);
}
catch (JMSException e)
{
@@ -264,9 +212,10 @@ public class SelectorTest extends QpidTestCase implements MessageListener
public void testRuntimeSelectorError() throws JMSException
{
- MessageConsumer consumer = _session.createConsumer(_destination , "testproperty % 5 = 1");
- MessageProducer producer = _session.createProducer(_destination);
- Message sentMsg = _session.createTextMessage();
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1");
+ MessageProducer producer = session.createProducer(_destination);
+ Message sentMsg = session.createTextMessage();
sentMsg.setIntProperty("testproperty", 1); // 1 % 5
producer.send(sentMsg);
@@ -289,9 +238,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener
public void testSelectorWithJMSMessageID() throws Exception
{
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer prod = session.createProducer(_destination);
MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL");
@@ -332,18 +279,6 @@ public class SelectorTest extends QpidTestCase implements MessageListener
Assert.assertNotNull("Msg5 should not be null", msg5);
}
- public void onMessage(Message message)
- {
- count++;
- _logger.info("Got Message:" + message);
- _responseLatch.countDown();
- }
-
- private static String randomize(String in)
- {
- return in + System.currentTimeMillis();
- }
-
public static void main(String[] argv) throws Exception
{
SelectorTest test = new SelectorTest();
@@ -357,7 +292,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener
{
test.setUp();
}
- test.test();
+ test.testUsingOnMessage();
if (test._connectionString.contains("vm://:1"))
{
@@ -371,9 +306,4 @@ public class SelectorTest extends QpidTestCase implements MessageListener
e.printStackTrace();
}
}
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(SelectorTest.class);
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index 1f90f1e29f..6b69ccab6c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Destination;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
@@ -105,7 +106,8 @@ public class JMSPropertiesTest extends QpidTestCase
assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType());
assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo());
assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:"));
-
+ assertEquals("JMS Default priority should be 4",Message.DEFAULT_PRIORITY,rm.getJMSPriority());
+
//Validate that the JMSX values are correct
assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID"));
assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq"));
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 1bef07fcd5..440bc31fe9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -22,13 +22,11 @@ package org.apache.qpid.test.utils;
import javax.jms.Connection;
-import org.apache.qpid.util.FileUtils;
-
public class FailoverBaseCase extends QpidTestCase
{
public static int FAILING_VM_PORT = 2;
- public static int FAILING_PORT = DEFAULT_PORT + 100;
+ public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
protected int failingPort;
@@ -54,7 +52,7 @@ public class FailoverBaseCase extends QpidTestCase
protected void setUp() throws java.lang.Exception
{
super.setUp();
- setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK")+"/"+getFailingPort());
startBroker(failingPort);
}
@@ -78,7 +76,6 @@ public class FailoverBaseCase extends QpidTestCase
{
stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT);
super.tearDown();
- FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index bed5d3242a..666c97c9de 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.store.DerbyMessageStore;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,11 +71,14 @@ public class QpidTestCase extends TestCase
protected final String QpidHome = System.getProperty("QPID_HOME");
protected File _configFile = new File(System.getProperty("broker.config"));
- private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
+ protected static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
protected long RECEIVE_TIMEOUT = 1000l;
- private Map<String, String> _setProperties = new HashMap<String, String>();
+ private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>();
+ private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>();
+ private Map<org.apache.log4j.Logger, Level> _loggerLevelSetForTest = new HashMap<org.apache.log4j.Logger, Level>();
+
private XMLConfiguration _testConfiguration = new XMLConfiguration();
/**
@@ -147,6 +151,7 @@ public class QpidTestCase extends TestCase
private static final String BROKER_LANGUAGE = "broker.language";
private static final String BROKER = "broker";
private static final String BROKER_CLEAN = "broker.clean";
+ private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests";
private static final String BROKER_VERSION = "broker.version";
protected static final String BROKER_READY = "broker.ready";
private static final String BROKER_STOPPED = "broker.stopped";
@@ -169,6 +174,7 @@ public class QpidTestCase extends TestCase
protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA);
protected String _broker = System.getProperty(BROKER, VM);
private String _brokerClean = System.getProperty(BROKER_CLEAN, null);
+ private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
private String _output = System.getProperty(TEST_OUTPUT);
@@ -187,6 +193,8 @@ public class QpidTestCase extends TestCase
public static final String TOPIC = "topic";
/** Map to hold test defined environment properties */
private Map<String, String> _env;
+ protected static final String INDEX = "index";
+ ;
public QpidTestCase(String name)
{
@@ -235,6 +243,19 @@ public class QpidTestCase extends TestCase
{
_logger.error("exception stopping broker", e);
}
+
+ if(_brokerCleanBetweenTests)
+ {
+ try
+ {
+ cleanBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("exception cleaning up broker", e);
+ }
+ }
+
_logger.info("========== stop " + _testName + " ==========");
if (redirected)
@@ -451,6 +472,15 @@ public class QpidTestCase extends TestCase
env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\"");
env.put("QPID_WORK", System.getProperty("QPID_WORK"));
+
+ // Use the environment variable to set amqj.logging.level for the broker
+ // The value used is a 'server' value in the test configuration to
+ // allow a differentiation between the client and broker logging levels.
+ if (System.getProperty("amqj.server.logging.level") != null)
+ {
+ setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level"));
+ }
+
// Add all the environment settings the test requested
if (!_env.isEmpty())
{
@@ -460,13 +490,27 @@ public class QpidTestCase extends TestCase
}
}
+
+ // Add default test logging levels that are used by the log4j-test
+ // Use the convenience methods to push the current logging setting
+ // in to the external broker's QPID_OPTS string.
+ if (System.getProperty("amqj.protocol.logging.level") != null)
+ {
+ setSystemProperty("amqj.protocol.logging.level");
+ }
+ if (System.getProperty("root.logging.level") != null)
+ {
+ setSystemProperty("root.logging.level");
+ }
+
+
String QPID_OPTS = " ";
// Add all the specified system properties to QPID_OPTS
- if (!_setProperties.isEmpty())
+ if (!_propertiesSetForBroker.isEmpty())
{
- for (String key : _setProperties.keySet())
+ for (String key : _propertiesSetForBroker.keySet())
{
- QPID_OPTS += "-D" + key + "=" + System.getProperty(key) + " ";
+ QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " ";
}
if (env.containsKey("QPID_OPTS"))
@@ -489,7 +533,7 @@ public class QpidTestCase extends TestCase
if (!p.await(30, TimeUnit.SECONDS))
{
- _logger.info("broker failed to become ready:" + p.getStopLine());
+ _logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine());
//Ensure broker has stopped
process.destroy();
cleanBroker();
@@ -667,28 +711,87 @@ public class QpidTestCase extends TestCase
}
/**
+ * Set a System property that is to be applied only to the external test
+ * broker.
+ *
+ * This is a convenience method to enable the setting of a -Dproperty=value
+ * entry in QPID_OPTS
+ *
+ * This is only useful for the External Java Broker tests.
+ *
+ * @param property the property name
+ * @param value the value to set the property to
+ */
+ protected void setBrokerOnlySystemProperty(String property, String value)
+ {
+ if (!_propertiesSetForBroker.containsKey(property))
+ {
+ _propertiesSetForBroker.put(property, value);
+ }
+
+ }
+
+ /**
+ * Set a System (-D) property for this test run.
+ *
+ * This convenience method copies the current VMs System Property
+ * for the external VM Broker.
+ *
+ * @param property the System property to set
+ */
+ protected void setSystemProperty(String property)
+ {
+ setSystemProperty(property, System.getProperty(property));
+ }
+
+ /**
* Set a System property for the duration of this test.
*
* When the test run is complete the value will be reverted.
*
+ * The values set using this method will also be propogated to the external
+ * Java Broker via a -D value defined in QPID_OPTS.
+ *
+ * If the value should not be set on the broker then use
+ * setTestClientSystemProperty().
+ *
* @param property the property to set
* @param value the new value to use
*/
protected void setSystemProperty(String property, String value)
{
- if (!_setProperties.containsKey(property))
+ // Record the value for the external broker
+ _propertiesSetForBroker.put(property, value);
+
+ //Set the value for the test client vm aswell.
+ setTestClientSystemProperty(property, value);
+ }
+
+ /**
+ * Set a System (-D) property for the external Broker of this test.
+ *
+ * @param property The property to set
+ * @param value the value to set it to.
+ */
+ protected void setTestClientSystemProperty(String property, String value)
+ {
+ if (!_propertiesSetForTestOnly.containsKey(property))
{
- _setProperties.put(property, System.getProperty(property));
- }
+ // Record the current value so we can revert it later.
+ _propertiesSetForTestOnly.put(property, System.getProperty(property));
+ }
System.setProperty(property, value);
}
+ /**
+ * Restore the System property values that were set before this test run.
+ */
protected void revertSystemProperties()
{
- for (String key : _setProperties.keySet())
+ for (String key : _propertiesSetForTestOnly.keySet())
{
- String value = _setProperties.get(key);
+ String value = _propertiesSetForTestOnly.get(key);
if (value != null)
{
System.setProperty(key, value);
@@ -698,6 +801,12 @@ public class QpidTestCase extends TestCase
System.clearProperty(key);
}
}
+
+ _propertiesSetForTestOnly.clear();
+
+ // We don't change the current VMs settings for Broker only properties
+ // so we can just clear this map
+ _propertiesSetForBroker.clear();
}
/**
@@ -712,6 +821,40 @@ public class QpidTestCase extends TestCase
}
/**
+ * Adjust the VMs Log4j Settings just for this test run
+ *
+ * @param logger the logger to change
+ * @param level the level to set
+ */
+ protected void setLoggerLevel(org.apache.log4j.Logger logger, Level level)
+ {
+ assertNotNull("Cannot set level of null logger", logger);
+ assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level);
+
+ if (!_loggerLevelSetForTest.containsKey(logger))
+ {
+ // Record the current value so we can revert it later.
+ _loggerLevelSetForTest.put(logger, logger.getLevel());
+ }
+
+ logger.setLevel(level);
+ }
+
+ /**
+ * Restore the logging levels defined by this test.
+ */
+ protected void revertLoggingLevels()
+ {
+ for (org.apache.log4j.Logger logger : _loggerLevelSetForTest.keySet())
+ {
+ logger.setLevel(_loggerLevelSetForTest.get(logger));
+ }
+
+ _loggerLevelSetForTest.clear();
+
+ }
+
+ /**
* Check whether the broker is an 0.8
*
* @return true if the broker is an 0_8 version, false otherwise.
@@ -885,6 +1028,7 @@ public class QpidTestCase extends TestCase
}
revertSystemProperties();
+ revertLoggingLevels();
}
/**
@@ -961,7 +1105,12 @@ public class QpidTestCase extends TestCase
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
- return session.createMessage();
+ Message message = session.createMessage();
+
+ message.setIntProperty(INDEX, msgCount);
+
+ return message;
+
}
public ConnectionURL getConnectionURL() throws NamingException
diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes
index f03d62667d..488ae24022 100644..100755
--- a/qpid/java/test-profiles/010Excludes
+++ b/qpid/java/test-profiles/010Excludes
@@ -92,3 +92,10 @@ org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
// QPID-2084 : this test needs more work for 0-10
org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#*
+
+// QPID-2118 : 0-10 Java client has differrent error handling to 0-8 code path
+org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
+
+//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker)
+org.apache.qpid.server.queue.ProducerFlowControlTest#*
+
diff --git a/qpid/java/test-profiles/08Excludes b/qpid/java/test-profiles/08Excludes
index b277c6d929..6f3898384d 100644
--- a/qpid/java/test-profiles/08Excludes
+++ b/qpid/java/test-profiles/08Excludes
@@ -14,8 +14,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-org.apache.qpid.test.client.RollbackOrderTest#*
-
// QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes
index de876e06bb..ee781fb80f 100644
--- a/qpid/java/test-profiles/08StandaloneExcludes
+++ b/qpid/java/test-profiles/08StandaloneExcludes
@@ -39,8 +39,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-org.apache.qpid.test.client.RollbackOrderTest#*
-
// This test requires the standard configuration file for validation.
// Excluding here does not reduce test coverage.
org.apache.qpid.server.configuration.ServerConfigurationFileTest#*
diff --git a/qpid/java/test-profiles/default.testprofile b/qpid/java/test-profiles/default.testprofile
index 86a5b2efb3..28127e5c4c 100644
--- a/qpid/java/test-profiles/default.testprofile
+++ b/qpid/java/test-profiles/default.testprofile
@@ -11,11 +11,14 @@ max_prefetch=1000
log=debug
amqj.logging.level=${log}
+amqj.server.logging.level=${log}
amqj.protocol.logging.level=${log}
root.logging.level=warn
log4j.configuration=file:///${test.profiles}/log4j-test.xml
log4j.debug=false
+# Note test-provider.properties also has variables of same name.
+# Keep in sync
test.port=15672
test.mport=18999
#Note : Management will start open second port on: mport + 100 : 19099
diff --git a/qpid/java/test-profiles/log4j-test.xml b/qpid/java/test-profiles/log4j-test.xml
index 0aaa7d8686..2d77942a81 100644
--- a/qpid/java/test-profiles/log4j-test.xml
+++ b/qpid/java/test-profiles/log4j-test.xml
@@ -29,10 +29,10 @@
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
+ <param name="ImmediateFlush" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%t %d %p [%c{4}] %m%n"/>
</layout>
- <param name="ImmediateFlush" value="true"/>
</appender>
<logger name="org.apache.qpid">
@@ -46,6 +46,10 @@
<logger name="org.apache.qpid.test.utils.QpidTestCase">
<level value="ALL"/>
</logger>
+
+ <logger name="org.apache.commons">
+ <level value="WARN"/>
+ </logger>
<root>
<level value="${root.logging.level}"/>
diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties
index e5cb18da0b..a349b0fcbf 100644
--- a/qpid/java/test-profiles/test-provider.properties
+++ b/qpid/java/test-profiles/test-provider.properties
@@ -19,10 +19,14 @@
#
#
-test.port=5672
-test.port.ssl=5671
-test.port.alt=5772
-test.port.alt.ssl=5771
+# Copied from default.testprofile
+test.port=15672
+test.mport=18999
+#Note : Java Management will start open second port on: mport + 100 : 19099
+test.port.ssl=15671
+test.port.alt=25672
+test.port.alt.ssl=25671
+
connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'