diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-05 13:29:44 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-05 13:29:44 +0000 |
commit | fd4934ce9155cb52315a46de4cf97d71ee1cefa1 (patch) | |
tree | a53e4b8145293c2ab71cfc01a208b28e8c809c4c | |
parent | f67026b148b3429ca9731bce94a0bd3ff2981ce5 (diff) | |
download | qpid-python-fd4934ce9155cb52315a46de4cf97d71ee1cefa1.tar.gz |
Merged from trunk up to r820933
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@821793 13f79535-47bb-0310-9956-ffa450edef68
45 files changed, 1325 insertions, 273 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index c4c73842bc..4f24f7ddc0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -27,13 +27,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; @@ -42,12 +41,8 @@ import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.UnauthorizedAccessException; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; @@ -60,6 +55,7 @@ import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; @@ -120,6 +116,11 @@ public class AMQChannel private final AMQProtocolSession _session; private boolean _closing; + private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + + private LogActor _actor; private LogSubject _logSubject; @@ -798,16 +799,32 @@ public class AMQChannel return _unacknowledgedMessageMap; } - + /** + * Called from the ChannelFlowHandler to suspend this Channel + * @param suspended boolean, should this Channel be suspended + */ public void setSuspended(boolean suspended) { - - boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { - _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started")); + // Log Flow Started before we start the subscriptions + if (!suspended) + { + _actor.message(_logSubject, ChannelMessages.CHN_1002("Started")); + } + + + // This section takes two different approaches to perform to perform + // the same function. Ensuring that the Subscription has taken note + // of the change in Channel State + // Here we have become unsuspended and so we ask each the queue to + // perform an Async delivery for each of the subscriptions in this + // Channel. The alternative would be to ensure that the subscription + // had received the change in suspension state. That way the logic + // behind decieding to start an async delivery was located with the + // Subscription. if (wasSuspended) { // may need to deliver queued messages @@ -816,6 +833,38 @@ public class AMQChannel s.getQueue().deliverAsync(s); } } + + + // Here we have become suspended so we need to ensure that each of + // the Subscriptions has noticed this change so that we can be sure + // they are not still sending messages. Again the code here is a + // very simplistic approach to ensure that the change of suspension + // has been noticed by each of the Subscriptions. Unlike the above + // case we don't actually need to do anything else. + if (!wasSuspended) + { + // may need to deliver queued messages + for (Subscription s : _tag2SubscriptionMap.values()) + { + try + { + s.getSendLock(); + } + finally + { + s.releaseSendLock(); + } + } + } + + + // Log Suspension only after we have confirmed all suspensions are + // stopped. + if (suspended) + { + _actor.message(_logSubject, ChannelMessages.CHN_1002("Stopped")); + } + } } @@ -967,4 +1016,37 @@ public class AMQChannel { return _actor; } + + public void block(AMQQueue queue) + { + if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + { + + if(_blocking.compareAndSet(false,true)) + { + _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString())); + flow(false); + } + } + } + + public void unblock(AMQQueue queue) + { + if(_blockingQueues.remove(queue)) + { + if(_blocking.compareAndSet(true,false)) + { + _actor.message(_logSubject, ChannelMessages.CHN_1006()); + + flow(true); + } + } + } + + private void flow(boolean flow) + { + MethodRegistry methodRegistry = _session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); + _session.writeFrame(responseBody.generateFrame(_channelId)); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 74bb7ee969..5c73e353de 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -108,4 +108,14 @@ public class QueueConfiguration return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap()); } + public long getCapacity() + { + return _config.getLong("capacity", _vHostConfig.getCapacity()); + } + + public long getFlowResumeCapacity() + { + return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity()); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index ed9d8acc08..01befbbfe8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -103,6 +103,8 @@ public class ServerConfiguration implements SignalHandler envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth"); envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize"); envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap"); + envVarMap.put("QPID_QUEUECAPACITY", "capacity"); + envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity"); envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer"); envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer"); envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay"); @@ -289,7 +291,6 @@ public class ServerConfiguration implements SignalHandler return conf; } - @Override public void handle(Signal arg0) { try @@ -507,6 +508,16 @@ public class ServerConfiguration implements SignalHandler return getConfig().getLong("minimumAlertRepeatGap", 0); } + public long getCapacity() + { + return getConfig().getLong("capacity", 0L); + } + + public long getFlowResumeCapacity() + { + return getConfig().getLong("flowResumeCapacity", getCapacity()); + } + public int getProcessors() { return getConfig().getInt("connector.processors", 4); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 0273a13262..6c72025ec2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -166,4 +166,15 @@ public class VirtualHostConfiguration return _config.getLong("queues.minimumAlertRepeatGap", 0); } + + public long getCapacity() + { + return _config.getLong("queues.capacity", 0l); + } + + public long getFlowResumeCapacity() + { + return _config.getLong("queues.flowResumeCapacity", getCapacity()); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java index 4502710dd6..0059a48c06 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java @@ -60,4 +60,9 @@ public abstract class AbstractActor implements LogActor return _rootLogger; } + public String toString() + { + return _logString; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java index 9b928accc0..5d2762fd1d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java @@ -36,4 +36,12 @@ public class BrokerActor extends AbstractActor _logString = "[Broker] "; } + + public BrokerActor(String name, RootMessageLogger logger) + { + super(logger); + + _logString = "[Broker(" + name + ")] "; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index 5ced7cc0b9..9169a1a651 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -249,12 +249,16 @@ CHN-1003 = Close # 0 - bytes allowed in prefetch # 1 - number of messagse. CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number} +CHN-1005 = Flow Control Enforced (Queue {0}) +CHN-1006 = Flow Control Removed #Queue # 0 - owner # 1 - priority QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}] QUE-1002 = Deleted +QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number} +QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number} #Exchange # 0 - type @@ -269,4 +273,4 @@ BND-1002 = Deleted #Subscription SUB-1001 = Create[ : Durable][ : Arguments : {0}] SUB-1002 = Close -SUB-1003 = State : {0}
\ No newline at end of file +SUB-1003 = State : {0} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index b5f6711c93..7daa10578f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.PrincipalHolder; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -170,6 +171,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void setMinimumAlertRepeatGap(long value); + long getCapacity(); + + void setCapacity(long capacity); + + + long getFlowResumeCapacity(); + + void setFlowResumeCapacity(long flowResumeCapacity); + + + void deleteMessageFromTop(StoreContext storeContext) throws AMQException; long clearQueue(StoreContext storeContext) throws AMQException; @@ -194,6 +206,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> Map<String, Object> getArguments(); + void checkCapacity(AMQChannel channel); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 71831efc7b..71ad05ce25 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -27,12 +27,104 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; +import java.util.HashMap; public class AMQQueueFactory { public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + private abstract static class QueueProperty + { + + private final AMQShortString _argumentName; + + + public QueueProperty(String argumentName) + { + _argumentName = new AMQShortString(argumentName); + } + + public AMQShortString getArgumentName() + { + return _argumentName; + } + + + public abstract void setPropertyValue(AMQQueue queue, Object value); + + } + + private abstract static class QueueLongProperty extends QueueProperty + { + + public QueueLongProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).longValue()); + } + + } + + abstract void setPropertyValue(AMQQueue queue, long value); + + + } + + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { + new QueueLongProperty("x-qpid-maximum-message-age") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageAge(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-size") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageSize(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-count") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageCount(value); + } + }, + new QueueLongProperty("x-qpid-minimum-alert-repeat-gap") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMinimumAlertRepeatGap(value); + } + }, + new QueueLongProperty("x-qpid-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setCapacity(value); + } + }, + new QueueLongProperty("x-qpid-flow-resume-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setFlowResumeCapacity(value); + } + } + + }; + + + public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, @@ -55,6 +147,18 @@ public class AMQQueueFactory //Register the new queue virtualHost.getQueueRegistry().registerQueue(q); q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString())); + + if(arguments != null) + { + for(QueueProperty p : DECLAREABLE_PROPERTIES) + { + if(arguments.containsKey(p.getArgumentName())) + { + p.setPropertyValue(q, arguments.get(p.getArgumentName())); + } + } + } + return q; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 67bc87145a..4f43695b13 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -46,7 +46,6 @@ public class QueueEntryImpl implements QueueEntry private MessageReference _message; - private Set<Subscription> _rejectedBy = null; private volatile EntryState _state = AVAILABLE_STATE; @@ -198,7 +197,6 @@ public class QueueEntryImpl implements QueueEntry _stateUpdater.set(this,AVAILABLE_STATE); } - public boolean immediateAndNotDelivered() { return getMessage().isImmediate() && !_deliveredToConsumer; @@ -419,4 +417,5 @@ public class QueueEntryImpl implements QueueEntry { return _queueEntryList; } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b040993d0b..ab5972b26f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -4,6 +4,8 @@ import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.management.JMException; @@ -30,6 +32,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.AMQChannel; /* * @@ -123,6 +126,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Executor _asyncDelivery; private final AtomicLong _totalMessagesReceived = new AtomicLong(); + private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); + /** max allowed size(KB) of a single message */ public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); @@ -149,6 +154,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private LogSubject _logSubject; private LogActor _logActor; + + private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); + private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -575,6 +584,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _deliveredMessages.incrementAndGet(); sub.send(entry); + setLastSeenEntry(sub,entry); } @@ -684,6 +694,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new FailedDequeueException(_name.toString(), e); } + checkCapacity(); + } private void decrementQueueSize(final QueueEntry entry) @@ -1233,11 +1245,61 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void deliverAsync() + public void checkCapacity(AMQChannel channel) + { + if(_capacity != 0l) + { + if(_atomicQueueSize.get() > _capacity) + { + //Overfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity)); + + if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) + { + channel.block(this); + } + + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + + channel.unblock(this); + _blockedChannels.remove(channel); + + } + + } + + + + } + } + + private void checkCapacity() { - _stateChangeCount.incrementAndGet(); + if(_capacity != 0L) + { + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + - Runner runner = new Runner(); + for(AMQChannel c : _blockedChannels.keySet()) + { + c.unblock(this); + _blockedChannels.remove(c); + } + } + } + } + + + public void deliverAsync() + { + Runner runner = new Runner(_stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1250,13 +1312,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(new SubFlushRunner(sub)); } + private class Runner implements ReadWriteRunnable { + String _name; + public Runner(long count) + { + _name = "QueueRunner-" + count + "-" + _logActor; + } + public void run() { + String originalName = Thread.currentThread().getName(); try { + Thread.currentThread().setName(_name); CurrentActor.set(_logActor); + processQueue(this); } catch (AMQException e) @@ -1266,9 +1338,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener finally { CurrentActor.remove(); + Thread.currentThread().setName(originalName); } - - } public boolean isRead() @@ -1280,6 +1351,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return true; } + + public String toString() + { + return _name; + } } private class SubFlushRunner implements ReadWriteRunnable @@ -1293,27 +1369,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void run() { - boolean complete = false; - try - { - CurrentActor.set(_sub.getLogActor()); - complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); - } - catch (AMQException e) - { - _logger.error(e); + String originalName = Thread.currentThread().getName(); + try{ + Thread.currentThread().setName("SubFlushRunner-"+_sub); + + boolean complete = false; + try + { + CurrentActor.set(_sub.getLogActor()); + complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); + + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + } + if (!complete && !_sub.isSuspended()) + { + _asyncDelivery.execute(this); + } } finally { - CurrentActor.remove(); - } - if (!complete && !_sub.isSuspended()) - { - _asyncDelivery.execute(this); + Thread.currentThread().setName(originalName); } - } public boolean isRead() @@ -1341,7 +1426,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { sub.getSendLock(); - atTail = attemptDelivery(sub); + atTail = attemptDelivery(sub); if (atTail && sub.isAutoClose()) { unregisterSubscription(sub); @@ -1371,44 +1456,48 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return atTail; } + /** + * Attempt delivery for the given subscription. + * + * Looks up the next node for the subscription and attempts to deliver it. + * + * @param sub + * @return true if we have completed all possible deliveries for this sub. + * @throws AMQException + */ private boolean attemptDelivery(Subscription sub) throws AMQException { boolean atTail = false; - boolean subActive = sub.isActive(); + boolean subActive = sub.isActive() && !sub.isSuspended(); if (subActive) { - QueueEntry node = getNextAvailableEntry(sub); if (node != null && !(node.isAcquired() || node.isDeleted())) { - if (!sub.isSuspended()) + if (sub.hasInterest(node)) { - if (sub.hasInterest(node)) + if (!sub.wouldSuspend(node)) { - if (!sub.wouldSuspend(node)) + if (sub.acquires() && !node.acquire(sub)) { - if (sub.acquires() && !node.acquire(sub)) - { - sub.onDequeue(node); - } - else - { - deliverMessage(sub, node); - } - + sub.onDequeue(node); } - else // Not enough Credit for message and wouldSuspend + else { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub, node)); + deliverMessage(sub, node); } - } + } + else // Not enough Credit for message and wouldSuspend + { + //QPID-1187 - Treat the subscription as suspended for this message + // and wait for the message to be removed to continue delivery. + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub, node)); + } } } @@ -1488,6 +1577,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asynchronousRunner.compareAndSet(runner, null); + // For every message enqueue/requeue the we fire deliveryAsync() which + // increases _stateChangeCount. If _sCC changes whilst we are in our loop + // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) + // then we will continue to run for a maximum of iterations. + // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move @@ -1507,7 +1601,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //iterate over the subscribers and try to advance their pointer while (subscriptionIter.advance()) { - boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); sub.getSendLock(); try @@ -1551,6 +1644,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rescheduling runner:" + runner); + } _asyncDelivery.execute(runner); } } @@ -1663,6 +1760,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public long getCapacity() + { + return _capacity; + } + + public void setCapacity(long capacity) + { + _capacity = capacity; + } + + public long getFlowResumeCapacity() + { + return _flowResumeCapacity; + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + _flowResumeCapacity = flowResumeCapacity; + } + + public Set<NotificationCheck> getNotificationChecks() { return _notificationChecks; @@ -1732,6 +1850,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(config.getMaximumMessageSize()); setMaximumMessageCount(config.getMaximumMessageCount()); setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + _capacity = config.getCapacity(); + _flowResumeCapacity = config.getFlowResumeCapacity(); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index b6137e83de..7511b5c818 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -102,7 +102,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { - instance.initialise(); + instance.initialise(instanceID); } catch (Exception e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index a049d1eb09..831f928832 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -42,18 +42,22 @@ import java.io.File; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { + private String _registryName; public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { super(new ServerConfiguration(configurationURL)); } - public void initialise() throws Exception + public void initialise(int instanceID) throws Exception { _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger()); + + _registryName = String.valueOf(instanceID); + // Set the Actor for current log messages - CurrentActor.set(new BrokerActor(_rootMessageLogger)); + CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger)); CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion())); @@ -83,7 +87,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void close() throws Exception { //Set the Actor for Broker Shutdown - CurrentActor.set(new BrokerActor(_rootMessageLogger)); + CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger)); try { super.close(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index ddb3fce5d2..92accb3499 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -43,8 +43,9 @@ public interface IApplicationRegistry * Initialise the application registry. All initialisation must be done in this method so that any components * that need access to the application registry itself for initialisation are able to use it. Attempting to * initialise in the constructor will lead to failures since the registry reference will not have been set. + * @param instanceID the instanceID that we can use to identify this AR. */ - void initialise() throws Exception; + void initialise(int instanceID) throws Exception; /** * Shutdown this Registry diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index ebec6af9f0..27a6f14b03 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -99,6 +99,7 @@ public class LocalTransactionalContext implements TransactionalContext { StoreContext.setCurrentContext(getStoreContext()); QueueEntry entry = _queue.enqueue(_message); + _queue.checkCapacity(_channel); if(entry.immediateAndNotDelivered()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 76cb7a397d..dd461bad47 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -96,6 +96,7 @@ public class NonTransactionalContext implements TransactionalContext StoreContext.clearCurrentContext(); + queue.checkCapacity(_channel); //following check implements the functionality //required by the 'immediate' flag: diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 8e1a6d2348..1d8400f736 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; @@ -280,6 +281,15 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public boolean getBlockOnQueueFull() + { + return false; + } + + public void setBlockOnQueueFull(boolean block) + { + } + public long getMinimumAlertRepeatGap() { return 0; //To change body of implemented methods use File | Settings | File Templates. @@ -295,7 +305,6 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - public void checkMessageStatus() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. @@ -336,6 +345,10 @@ public class MockAMQQueue implements AMQQueue return null; //To change body of implemented methods use File | Settings | File Templates. } + public void checkCapacity(AMQChannel channel) + { + } + public ManagedObject getManagedObject() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -346,12 +359,31 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - public void setMinimumAlertRepeatGap(long value) { } + public long getCapacity() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setCapacity(long capacity) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public long getFlowResumeCapacity() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void configure(QueueConfiguration config) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java index e75ed640aa..4c8ff01be0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.registry; import junit.framework.TestCase; import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.AMQException; import java.security.Security; import java.security.Provider; @@ -69,7 +68,7 @@ public class ApplicationRegistryShutdownTest extends TestCase // Register new providers try { - _registry.initialise(); + _registry.initialise(ApplicationRegistry.DEFAULT_INSTANCE); } catch (Exception e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 8fef8baa02..705e7d2ad7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -50,7 +50,7 @@ public class NullApplicationRegistry extends ApplicationRegistry super(new ServerConfiguration(new PropertiesConfiguration())); } - public void initialise() throws Exception + public void initialise(int instanceID) throws Exception { _logger.info("Initialising NullApplicationRegistry"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 43948c05c4..7b7c86bb80 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.util; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -31,7 +30,6 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.access.ACLManager; -import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; @@ -45,7 +43,6 @@ import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger; import java.util.Collection; -import java.util.HashMap; import java.util.Properties; import java.util.Arrays; @@ -75,7 +72,7 @@ public class TestApplicationRegistry extends ApplicationRegistry _config = config; } - public void initialise() throws Exception + public void initialise(int instanceID) throws Exception { _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger()); diff --git a/qpid/java/client/src/main/java/log4j.xml b/qpid/java/client/src/main/java/log4j.xml new file mode 100644 index 0000000000..c27acba818 --- /dev/null +++ b/qpid/java/client/src/main/java/log4j.xml @@ -0,0 +1,36 @@ +<!-- + + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - +--> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%-5p %c{1} - %m%n"/> + </layout> + </appender> + + <logger name="org.apache.qpid"> + <level value="warn"/> + <appender-ref ref="console" /> + </logger> + +</log4j:configuration> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0d2adcec8a..5659ce0474 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -450,7 +450,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - // use the defaul value set for all connections + // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } @@ -512,7 +512,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect boolean retryAllowed = true; Exception connectionException = null; - while (!_connected && retryAllowed) + while (!_connected && retryAllowed && brokerDetails != null) { ProtocolVersion pe = null; try @@ -691,12 +691,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean attemptReconnection() { - while (_failoverPolicy.failoverAllowed()) + BrokerDetails broker = null; + while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { try { - makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); - + makeBrokerConnection(broker); return true; } catch (Exception e) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2e3e417c95..fa15df34ec 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -91,6 +91,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; +import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +115,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -198,16 +200,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * The default value for immediate flag used by producers created by this session is false. That is, a consumer does * not need to be attached to a queue. */ - protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); /** * The default value for mandatory flag used by producers created by this session is true. That is, server will not * silently drop messages where no queue is connected to the exchange for the message. */ - protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + + protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false")); + + /** + * The period to wait while flow controlled before sending a log message confirming that the session is still + * waiting on flow control being revoked + */ + protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + + /** + * The period to wait while flow controlled before declaring a failure + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; + protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure", + DEFAULT_FLOW_CONTROL_WAIT_FAILURE); protected final boolean DECLARE_QUEUES = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); + protected final boolean DECLARE_EXCHANGES = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); @@ -778,7 +796,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e); } catch (FailoverException e) { @@ -1559,6 +1577,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic suspendChannel(true); } + // Let the dispatcher know that all the incomming messages + // should be rolled back(reject/release) + _rollbackMark.set(_highestDeliveryTag.get()); + + syncDispatchQueue(); + + _dispatcher.rollback(); + releaseForRollback(); sendRollback(); @@ -1851,26 +1877,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void failoverPrep() { - startDispatcherIfNecessary(); syncDispatchQueue(); } void syncDispatchQueue() { - final CountDownLatch signal = new CountDownLatch(1); - _queue.add(new Dispatchable() { - public void dispatch(AMQSession ssn) + if (Thread.currentThread() == _dispatcherThread) + { + while (!_closed.get() && !_queue.isEmpty()) { - signal.countDown(); + Dispatchable disp; + try + { + disp = (Dispatchable) _queue.take(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + // Check just in case _queue becomes empty, it shouldn't but + // better than an NPE. + if (disp == null) + { + _logger.debug("_queue became empty during sync."); + break; + } + + disp.dispatch(AMQSession.this); } - }); - try - { - signal.await(); } - catch (InterruptedException e) + else { - throw new RuntimeException(e); + startDispatcherIfNecessary(); + + final CountDownLatch signal = new CountDownLatch(1); + + _queue.add(new Dispatchable() + { + public void dispatch(AMQSession ssn) + { + signal.countDown(); + } + }); + + try + { + signal.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } } @@ -2233,7 +2291,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { - return createProducerImpl(destination, mandatory, immediate, false); + return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND); } private P createProducerImpl(final Destination destination, final boolean mandatory, @@ -2704,15 +2762,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setFlowControl(final boolean active) { _flowControl.setFlowControl(active); + _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); } - public void checkFlowControl() throws InterruptedException + public void checkFlowControl() throws InterruptedException, JMSException { + long expiryTime = 0L; synchronized (_flowControl) { - while (!_flowControl.getFlowControl()) + while (!_flowControl.getFlowControl() && + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE) + : expiryTime) >= System.currentTimeMillis() ) + { + + _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + } + if(!_flowControl.getFlowControl()) { - _flowControl.wait(); + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 0644bd88a8..1587d6a6bf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void releaseForRollback() { - startDispatcherIfNecessary(); - syncDispatchQueue(); - _dispatcher.rollback(); getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); _txRangeSet.clear(); _txSize = 0; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index d7196c0abb..bc1453beaf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -195,6 +195,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public void releaseForRollback() { + // Reject all the messages that have been received in this session and + // have not yet been acknowledged. Should look to remove + // _deliveredMessageTags and use _txRangeSet as used by 0-10. + // Otherwise messages will be able to arrive out of order to a second + // consumer on the queue. Whilst this is within the JMS spec it is not + // user friendly and avoidable. while (true) { Long tag = _deliveredMessageTags.poll(); @@ -205,11 +211,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B rejectMessage(tag, true); } - - if (_dispatcher != null) - { - _dispatcher.rollback(); - } } public void rejectMessage(long deliveryTag, boolean requeue) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 5ff6066ddc..44ce59975a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -60,7 +60,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac /** * Priority of messages created by this producer. */ - private int _messagePriority; + private int _messagePriority = Message.DEFAULT_PRIORITY; /** * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 9c791730ca..0e3333940a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance(); private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); @@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException { - return false; + _channelFlowMethodHandler.methodReceived(_session, body, channelId); + return true; } public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java index e05a7ab6e2..960661daea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -189,7 +189,8 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener { synchronized (_brokerListLock) { - return _connectionDetails.getBrokerDetails(_currentBrokerIndex); + _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + return _currentBrokerDetail; } } @@ -214,7 +215,15 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener broker.getHost().equals(_currentBrokerDetail.getHost()) && broker.getPort() == _currentBrokerDetail.getPort()) { - return getNextBrokerDetails(); + if (_connectionDetails.getBrokerCount() > 1) + { + return getNextBrokerDetails(); + } + else + { + _failedAttemps ++; + return null; + } } String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); diff --git a/qpid/java/common/bin/qpid-run b/qpid/java/common/bin/qpid-run index 0b5070d937..63bb648fd8 100755 --- a/qpid/java/common/bin/qpid-run +++ b/qpid/java/common/bin/qpid-run @@ -111,6 +111,7 @@ if [ -n "$QPID_LOG_SUFFIX" ]; then fi log $INFO System Properties set to $SYSTEM_PROPS +log $INFO QPID_OPTS set to $QPID_OPTS program=$(basename $0) sourced=${BASH_SOURCE[0]} diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 0c32414647..5796af928a 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -261,6 +261,7 @@ <jvmarg value="${jvm.args}"/> <sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/> + <sysproperty key="amqj.server.logging.level" value="${amqj.server.logging.level}"/> <sysproperty key="amqj.protocol.logging.level" value="${amqj.protocol.logging.level}"/> <sysproperty key="log4j.debug" value="${log4j.debug}"/> <sysproperty key="root.logging.level" value="${root.logging.level}"/> @@ -269,6 +270,7 @@ <sysproperty key="java.naming.provider.url" value="${java.naming.provider.url}"/> <sysproperty key="broker" value="${broker}"/> <sysproperty key="broker.clean" value="${broker.clean}"/> + <sysproperty key="broker.clean.between.tests" value="${broker.clean.between.tests}"/> <sysproperty key="broker.version" value="${broker.version}"/> <sysproperty key="broker.ready" value="${broker.ready}" /> <sysproperty key="broker.stopped" value="${broker.stopped}" /> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java index 52120019f5..e7975f8d24 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java @@ -78,15 +78,23 @@ public class BrokerStartupTest extends AbstractTestLogging // Add an invalid value _broker += " -l invalid"; - // The release-bin build of the broker uses this log4j configuration - // so set up the broker environment to use it for this test. - // Also include -Dlog4j.debug so we can validate that it picked up this config - setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug -Dlog4j.configuration=file:" + System.getProperty(QPID_HOME) + "/../broker/src/main/java/log4j.properties"); + // The broker has a built in default log4j configuration set up + // so if the the broker cannot load the -l value it will use default + // use this default. Test that this is correctly loaded, by + // including -Dlog4j.debug so we can validate. + setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug"); // Disable all client logging so we can test for broker DEBUG only. - Logger.getRootLogger().setLevel(Level.WARN); - Logger.getLogger("qpid.protocol").setLevel(Level.WARN); - Logger.getLogger("org.apache.qpid").setLevel(Level.WARN); + setLoggerLevel(Logger.getRootLogger(), Level.WARN); + setLoggerLevel(Logger.getLogger("qpid.protocol"), Level.WARN); + setLoggerLevel(Logger.getLogger("org.apache.qpid"), Level.WARN); + + // Set the broker to use info level logging, which is the qpid-server + // default. Rather than debug which is the test default. + setBrokerOnlySystemProperty("amqj.server.logging.level", "info"); + // Set the logging defaults to info for this test. + setBrokerOnlySystemProperty("amqj.logging.level", "info"); + setBrokerOnlySystemProperty("root.logging.level", "info"); startBroker(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java index 4f50aba61d..b00a71315e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java @@ -160,7 +160,7 @@ public class BrokerLoggingTest extends AbstractTestLogging // Set the broker.ready string to check for the _log4j default that // is still present on standard out. - System.setProperty(BROKER_READY, "Qpid Broker Ready"); + setTestClientSystemProperty(BROKER_READY, "Qpid Broker Ready"); startBroker(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java new file mode 100644 index 0000000000..97147904e1 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -0,0 +1,324 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; + +import javax.jms.*; +import javax.naming.NamingException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class ProducerFlowControlTest extends QpidTestCase +{ + private static final int TIMEOUT = 1500; + + + private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); + + protected final String QUEUE = "ProducerFlowControl"; + + private static final int MSG_COUNT = 50; + + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + private Session consumerSession; + + + private MessageConsumer consumer; + private final AtomicInteger _sentMessages = new AtomicInteger(); + + protected void setUp() throws Exception + { + super.setUp(); + + producerConnection = getConnection(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + producerConnection.start(); + + consumerConnection = getConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + protected void tearDown() throws Exception + { + producerConnection.close(); + consumerConnection.close(); + super.tearDown(); + } + + public void testCapacityExceededCausesBlock() + throws JMSException, NamingException, AMQException, InterruptedException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get()); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message not sent after two messages received", 5, _sentMessages.get()); + + } + + + public void testFlowControlOnCapacityResumeEqual() + throws JMSException, NamingException, AMQException, InterruptedException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",1000); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get()); + + + } + + + public void testFlowControlSoak() + throws Exception, NamingException, AMQException, InterruptedException + { + _sentMessages.set(0); + final int numProducers = 10; + final int numMessages = 100; + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",6000); + arguments.put("x-qpid-flow-resume-capacity",3000); + + ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments); + + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); + consumerConnection.start(); + + Connection[] producers = new Connection[numProducers]; + for(int i = 0 ; i < numProducers; i ++) + { + + producers[i] = getConnection(); + producers[i].start(); + Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer myproducer = session.createProducer(queue); + MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L); + } + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + for(int j = 0; j < numProducers * numMessages; j++) + { + + Message msg = consumer.receive(5000); + Thread.sleep(50L); + assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg); + + } + + + + Message msg = consumer.receive(500); + assertNull("extra message received", msg); + + + for(int i = 0; i < numProducers; i++) + { + producers[i].close(); + } + + } + + + + public void testSendTimeout() + throws JMSException, NamingException, AMQException, InterruptedException + { + long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + System.setProperty("qpid.flow_control_wait_failure","3000"); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(10000); + + Exception e = sender.getException(); + + assertNotNull("No timeout exception on sending", e); + + System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue)); + + + + } + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod); + new Thread(sender).start(); + return sender; + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + _sentMessages.incrementAndGet(); + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + } + } + } + + private static final byte[] BYTE_300 = new byte[300]; + + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_300); + send.setIntProperty("msg", msg); + + return send; + } + + + private class MessageSender implements Runnable + { + private final MessageProducer _producer; + private final Session _producerSession; + private final int _numMessages; + + + + private JMSException _exception; + private long _sleepPeriod; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + { + _producer = producer; + _producerSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + } + + public void run() + { + try + { + sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod); + } + catch (JMSException e) + { + _exception = e; + } + } + + public JMSException getException() + { + return _exception; + } + } +}
\ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index 62b54d3086..2e625f95c0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -61,7 +61,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase setupSession(); - _queue = _clientSession.createQueue(getName()+System.currentTimeMillis()); + _queue = _clientSession.createQueue(getTestQueueName()); _clientSession.createConsumer(_queue).close(); //Ensure there are no messages on the queue to start with. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java index 39e2b892a9..ff766c907d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java @@ -22,64 +22,172 @@ package org.apache.qpid.test.client; import org.apache.qpid.test.utils.*; import javax.jms.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import junit.framework.ComparisonFailure; +import junit.framework.AssertionFailedError; /** - * RollbackOrderTest + * RollbackOrderTest, QPID-1864, QPID-1871 + * + * Description: + * + * The problem that this test is exposing is that the dispatcher used to be capable + * of holding on to a message when stopped. This ment that when the rollback was + * called and the dispatcher stopped it may have hold of a message. So after all + * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue) + * have been cleared the client still had a single message, the one the + * dispatcher was holding on to. + * + * As a result the TxRollback operation would run and then release the dispatcher. + * Whilst the dispatcher would then proceed to reject the message it was holiding + * the Broker would already have resent that message so the rejection would silently + * fail. + * + * And the client would receieve that single message 'early', depending on the + * number of messages already recevied when rollback was called. + * + * + * Aims: + * + * The tests puts 50 messages on to the queue. + * + * The test then tries to cause the dispatcher to stop whilst it is in the process + * of moving a message from the preDeliveryQueue to a consumers sychronousQueue. + * + * To exercise this path we have 50 message flowing to the client to give the + * dispatcher a bit of work to do moving messages. + * + * Then we loop - 10 times + * - Validating that the first message received is always message 1. + * - Receive a few more so that there are a few messages to reject. + * - call rollback, to try and catch the dispatcher mid process. + * + * Outcome: + * + * The hope is that we catch the dispatcher mid process and cause a BasicReject + * to fail. Which will be indicated in the log but will also cause that failed + * rejected message to be the next to be delivered which will not be message 1 + * as expected. + * + * We are testing a race condition here but we can check through the log file if + * the race condition occured. However, performing that check will only validate + * the problem exists and will not be suitable as part of a system test. * */ - public class RollbackOrderTest extends QpidTestCase { - private Connection conn; - private Queue queue; - private Session ssn; - private MessageProducer prod; - private MessageConsumer cons; + private Connection _connection; + private Queue _queue; + private Session _session; + private MessageConsumer _consumer; @Override public void setUp() throws Exception { super.setUp(); - conn = getConnection(); - conn.start(); - ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); - queue = ssn.createQueue("rollback-order-test-queue"); - prod = ssn.createProducer(queue); - cons = ssn.createConsumer(queue); - for (int i = 0; i < 5; i++) - { - TextMessage msg = ssn.createTextMessage("message " + (i+1)); - prod.send(msg); - } - ssn.commit(); + _connection = getConnection(); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _queue = _session.createQueue(getTestQueueName()); + _consumer = _session.createConsumer(_queue); + + //Send more messages so it is more likely that the dispatcher is + // processing on rollback. + sendMessage(_session, _queue, 50); + _session.commit(); + } public void testOrderingAfterRollback() throws Exception { - for (int i = 0; i < 10; i++) + //Start the session now so we + _connection.start(); + + for (int i = 0; i < 20; i++) { - TextMessage msg = (TextMessage) cons.receive(); - assertEquals("message 1", msg.getText()); - ssn.rollback(); + Message msg = _consumer.receive(); + assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX)); + + // Pull additional messages through so we have some reject work to do + for (int m=0; m < 5 ; m++) + { + _consumer.receive(); + } + + System.err.println("ROT-Rollback"); + _logger.warn("ROT-Rollback"); + _session.rollback(); } } - @Override public void tearDown() throws Exception + public void testOrderingAfterRollbackOnMessage() throws Exception { - while (true) + final CountDownLatch count= new CountDownLatch(20); + final Exception exceptions[] = new Exception[20]; + final AtomicBoolean failed = new AtomicBoolean(false); + + _consumer.setMessageListener(new MessageListener() { - Message msg = cons.receiveNoWait(); - if (msg == null) + + public void onMessage(Message message) { - break; + + Message msg = message; + try + { + count.countDown(); + assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX)); + + _session.rollback(); + } + catch (JMSException e) + { + System.out.println("Error:" + e.getMessage()); + exceptions[(int)count.getCount()] = e; + } + catch (AssertionFailedError cf) + { + // End Test if Equality test fails + while (count.getCount() != 0) + { + count.countDown(); + } + + System.out.println("Error:" + cf.getMessage()); + System.err.println(cf.getMessage()); + cf.printStackTrace(); + failed.set(true); + } } - else + }); + //Start the session now so we + _connection.start(); + + count.await(); + + for (Exception e : exceptions) + { + if (e != null) { - msg.acknowledge(); + System.err.println(e.getMessage()); + e.printStackTrace(); + failed.set(true); } } - ssn.commit(); + +// _consumer.close(); + _connection.close(); + + assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get()); + } + + @Override public void tearDown() throws Exception + { + + drainQueue(_queue); + super.tearDown(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index d76d3858b3..1ec39bd1e0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -22,9 +22,7 @@ package org.apache.qpid.test.client.message; import java.util.concurrent.CountDownLatch; -import javax.jms.Connection; import javax.jms.DeliveryMode; -import javax.jms.Destination; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; @@ -35,15 +33,12 @@ import javax.jms.Session; import junit.framework.Assert; -import org.apache.log4j.BasicConfigurator; -import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +48,6 @@ public class SelectorTest extends QpidTestCase implements MessageListener private AMQConnection _connection; private AMQDestination _destination; - private AMQSession _session; private int count; public String _connectionString = "vm://:1"; private static final String INVALID_SELECTOR = "Cost LIKE 5"; @@ -66,40 +60,12 @@ public class SelectorTest extends QpidTestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - BasicConfigurator.configure(); init((AMQConnection) getConnection("guest", "guest")); } - protected void tearDown() throws Exception - { - super.tearDown(); - if (_session != null) - { - try - { - _session.close(); - } - catch (JMSException e) - { - fail("Error cleaning up:" + e.getMessage()); - } - } - if (_connection != null) - { - try - { - _connection.close(); - } - catch (JMSException e) - { - fail("Error cleaning up:" + e.getMessage()); - } - } - } - private void init(AMQConnection connection) throws JMSException { - init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true)); + init(connection, new AMQQueue(connection, getTestQueueName(), true)); } private void init(AMQConnection connection, AMQDestination destination) throws JMSException @@ -107,23 +73,27 @@ public class SelectorTest extends QpidTestCase implements MessageListener _connection = connection; _destination = destination; connection.start(); + } + + public void onMessage(Message message) + { + count++; + _logger.info("Got Message:" + message); + _responseLatch.countDown(); + } - String selector = null; - selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; + public void testUsingOnMessage() throws Exception + { + String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; - _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); // _session.createConsumer(destination).setMessageListener(this); - _session.createConsumer(destination, selector).setMessageListener(this); - } + session.createConsumer(_destination, selector).setMessageListener(this); - public void test() throws Exception - { try { - init((AMQConnection) getConnection("guest", "guest", randomize("Client"))); - - Message msg = _session.createTextMessage("Message"); + Message msg = session.createTextMessage("Message"); msg.setJMSPriority(1); msg.setIntProperty("Cost", 2); msg.setStringProperty("property-with-hyphen", "wibble"); @@ -131,7 +101,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener _logger.info("Sending Message:" + msg); - ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT); + ((BasicMessageProducer) session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT); _logger.info("Message sent, waiting for response..."); _responseLatch.await(); @@ -163,40 +133,18 @@ public class SelectorTest extends QpidTestCase implements MessageListener { _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage()); } - catch (URLSyntaxException e) - { - _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage()); - fail("Wrong exception"); - } - catch (AMQException e) - { - _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage()); - fail("Wrong exception"); - } - finally - { - if (_session != null) - { - _session.close(); - } - if (_connection != null) - { - _connection.close(); - } - } } public void testUnparsableSelectors() throws Exception { - Connection connection = getConnection("guest", "guest", randomize("Client")); - _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + AMQSession session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); boolean caught = false; //Try Creating a Browser try { - _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR); + session.createBrowser(session.createQueue("Ping"), INVALID_SELECTOR); } catch (JMSException e) { @@ -213,7 +161,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener //Try Creating a Consumer try { - _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR); + session.createConsumer(session.createQueue("Ping"), INVALID_SELECTOR); } catch (JMSException e) { @@ -230,7 +178,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener //Try Creating a Receiever try { - _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR); + session.createReceiver(session.createQueue("Ping"), INVALID_SELECTOR); } catch (JMSException e) { @@ -246,7 +194,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener try { - _session.createReceiver(_session.createQueue("Ping"), BAD_MATHS_SELECTOR); + session.createReceiver(session.createQueue("Ping"), BAD_MATHS_SELECTOR); } catch (JMSException e) { @@ -264,9 +212,10 @@ public class SelectorTest extends QpidTestCase implements MessageListener public void testRuntimeSelectorError() throws JMSException { - MessageConsumer consumer = _session.createConsumer(_destination , "testproperty % 5 = 1"); - MessageProducer producer = _session.createProducer(_destination); - Message sentMsg = _session.createTextMessage(); + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1"); + MessageProducer producer = session.createProducer(_destination); + Message sentMsg = session.createTextMessage(); sentMsg.setIntProperty("testproperty", 1); // 1 % 5 producer.send(sentMsg); @@ -289,9 +238,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener public void testSelectorWithJMSMessageID() throws Exception { - Connection conn = getConnection(); - conn.start(); - Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); + Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); MessageProducer prod = session.createProducer(_destination); MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL"); @@ -332,18 +279,6 @@ public class SelectorTest extends QpidTestCase implements MessageListener Assert.assertNotNull("Msg5 should not be null", msg5); } - public void onMessage(Message message) - { - count++; - _logger.info("Got Message:" + message); - _responseLatch.countDown(); - } - - private static String randomize(String in) - { - return in + System.currentTimeMillis(); - } - public static void main(String[] argv) throws Exception { SelectorTest test = new SelectorTest(); @@ -357,7 +292,7 @@ public class SelectorTest extends QpidTestCase implements MessageListener { test.setUp(); } - test.test(); + test.testUsingOnMessage(); if (test._connectionString.contains("vm://:1")) { @@ -371,9 +306,4 @@ public class SelectorTest extends QpidTestCase implements MessageListener e.printStackTrace(); } } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(SelectorTest.class); - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 1f90f1e29f..6b69ccab6c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Destination; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; @@ -105,7 +106,8 @@ public class JMSPropertiesTest extends QpidTestCase assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType()); assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo()); assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:")); - + assertEquals("JMS Default priority should be 4",Message.DEFAULT_PRIORITY,rm.getJMSPriority()); + //Validate that the JMSX values are correct assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID")); assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq")); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 1bef07fcd5..440bc31fe9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -22,13 +22,11 @@ package org.apache.qpid.test.utils; import javax.jms.Connection; -import org.apache.qpid.util.FileUtils; - public class FailoverBaseCase extends QpidTestCase { public static int FAILING_VM_PORT = 2; - public static int FAILING_PORT = DEFAULT_PORT + 100; + public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt")); protected int failingPort; @@ -54,7 +52,7 @@ public class FailoverBaseCase extends QpidTestCase protected void setUp() throws java.lang.Exception { super.setUp(); - setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort()); + setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK")+"/"+getFailingPort()); startBroker(failingPort); } @@ -78,7 +76,6 @@ public class FailoverBaseCase extends QpidTestCase { stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT); super.tearDown(); - FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort()); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index bed5d3242a..666c97c9de 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.store.DerbyMessageStore; import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,11 +71,14 @@ public class QpidTestCase extends TestCase protected final String QpidHome = System.getProperty("QPID_HOME"); protected File _configFile = new File(System.getProperty("broker.config")); - private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class); + protected static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class); protected long RECEIVE_TIMEOUT = 1000l; - private Map<String, String> _setProperties = new HashMap<String, String>(); + private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>(); + private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>(); + private Map<org.apache.log4j.Logger, Level> _loggerLevelSetForTest = new HashMap<org.apache.log4j.Logger, Level>(); + private XMLConfiguration _testConfiguration = new XMLConfiguration(); /** @@ -147,6 +151,7 @@ public class QpidTestCase extends TestCase private static final String BROKER_LANGUAGE = "broker.language"; private static final String BROKER = "broker"; private static final String BROKER_CLEAN = "broker.clean"; + private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests"; private static final String BROKER_VERSION = "broker.version"; protected static final String BROKER_READY = "broker.ready"; private static final String BROKER_STOPPED = "broker.stopped"; @@ -169,6 +174,7 @@ public class QpidTestCase extends TestCase protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA); protected String _broker = System.getProperty(BROKER, VM); private String _brokerClean = System.getProperty(BROKER_CLEAN, null); + private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS); private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); private String _output = System.getProperty(TEST_OUTPUT); @@ -187,6 +193,8 @@ public class QpidTestCase extends TestCase public static final String TOPIC = "topic"; /** Map to hold test defined environment properties */ private Map<String, String> _env; + protected static final String INDEX = "index"; + ; public QpidTestCase(String name) { @@ -235,6 +243,19 @@ public class QpidTestCase extends TestCase { _logger.error("exception stopping broker", e); } + + if(_brokerCleanBetweenTests) + { + try + { + cleanBroker(); + } + catch (Exception e) + { + _logger.error("exception cleaning up broker", e); + } + } + _logger.info("========== stop " + _testName + " =========="); if (redirected) @@ -451,6 +472,15 @@ public class QpidTestCase extends TestCase env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\""); env.put("QPID_WORK", System.getProperty("QPID_WORK")); + + // Use the environment variable to set amqj.logging.level for the broker + // The value used is a 'server' value in the test configuration to + // allow a differentiation between the client and broker logging levels. + if (System.getProperty("amqj.server.logging.level") != null) + { + setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level")); + } + // Add all the environment settings the test requested if (!_env.isEmpty()) { @@ -460,13 +490,27 @@ public class QpidTestCase extends TestCase } } + + // Add default test logging levels that are used by the log4j-test + // Use the convenience methods to push the current logging setting + // in to the external broker's QPID_OPTS string. + if (System.getProperty("amqj.protocol.logging.level") != null) + { + setSystemProperty("amqj.protocol.logging.level"); + } + if (System.getProperty("root.logging.level") != null) + { + setSystemProperty("root.logging.level"); + } + + String QPID_OPTS = " "; // Add all the specified system properties to QPID_OPTS - if (!_setProperties.isEmpty()) + if (!_propertiesSetForBroker.isEmpty()) { - for (String key : _setProperties.keySet()) + for (String key : _propertiesSetForBroker.keySet()) { - QPID_OPTS += "-D" + key + "=" + System.getProperty(key) + " "; + QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " "; } if (env.containsKey("QPID_OPTS")) @@ -489,7 +533,7 @@ public class QpidTestCase extends TestCase if (!p.await(30, TimeUnit.SECONDS)) { - _logger.info("broker failed to become ready:" + p.getStopLine()); + _logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine()); //Ensure broker has stopped process.destroy(); cleanBroker(); @@ -667,28 +711,87 @@ public class QpidTestCase extends TestCase } /** + * Set a System property that is to be applied only to the external test + * broker. + * + * This is a convenience method to enable the setting of a -Dproperty=value + * entry in QPID_OPTS + * + * This is only useful for the External Java Broker tests. + * + * @param property the property name + * @param value the value to set the property to + */ + protected void setBrokerOnlySystemProperty(String property, String value) + { + if (!_propertiesSetForBroker.containsKey(property)) + { + _propertiesSetForBroker.put(property, value); + } + + } + + /** + * Set a System (-D) property for this test run. + * + * This convenience method copies the current VMs System Property + * for the external VM Broker. + * + * @param property the System property to set + */ + protected void setSystemProperty(String property) + { + setSystemProperty(property, System.getProperty(property)); + } + + /** * Set a System property for the duration of this test. * * When the test run is complete the value will be reverted. * + * The values set using this method will also be propogated to the external + * Java Broker via a -D value defined in QPID_OPTS. + * + * If the value should not be set on the broker then use + * setTestClientSystemProperty(). + * * @param property the property to set * @param value the new value to use */ protected void setSystemProperty(String property, String value) { - if (!_setProperties.containsKey(property)) + // Record the value for the external broker + _propertiesSetForBroker.put(property, value); + + //Set the value for the test client vm aswell. + setTestClientSystemProperty(property, value); + } + + /** + * Set a System (-D) property for the external Broker of this test. + * + * @param property The property to set + * @param value the value to set it to. + */ + protected void setTestClientSystemProperty(String property, String value) + { + if (!_propertiesSetForTestOnly.containsKey(property)) { - _setProperties.put(property, System.getProperty(property)); - } + // Record the current value so we can revert it later. + _propertiesSetForTestOnly.put(property, System.getProperty(property)); + } System.setProperty(property, value); } + /** + * Restore the System property values that were set before this test run. + */ protected void revertSystemProperties() { - for (String key : _setProperties.keySet()) + for (String key : _propertiesSetForTestOnly.keySet()) { - String value = _setProperties.get(key); + String value = _propertiesSetForTestOnly.get(key); if (value != null) { System.setProperty(key, value); @@ -698,6 +801,12 @@ public class QpidTestCase extends TestCase System.clearProperty(key); } } + + _propertiesSetForTestOnly.clear(); + + // We don't change the current VMs settings for Broker only properties + // so we can just clear this map + _propertiesSetForBroker.clear(); } /** @@ -712,6 +821,40 @@ public class QpidTestCase extends TestCase } /** + * Adjust the VMs Log4j Settings just for this test run + * + * @param logger the logger to change + * @param level the level to set + */ + protected void setLoggerLevel(org.apache.log4j.Logger logger, Level level) + { + assertNotNull("Cannot set level of null logger", logger); + assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level); + + if (!_loggerLevelSetForTest.containsKey(logger)) + { + // Record the current value so we can revert it later. + _loggerLevelSetForTest.put(logger, logger.getLevel()); + } + + logger.setLevel(level); + } + + /** + * Restore the logging levels defined by this test. + */ + protected void revertLoggingLevels() + { + for (org.apache.log4j.Logger logger : _loggerLevelSetForTest.keySet()) + { + logger.setLevel(_loggerLevelSetForTest.get(logger)); + } + + _loggerLevelSetForTest.clear(); + + } + + /** * Check whether the broker is an 0.8 * * @return true if the broker is an 0_8 version, false otherwise. @@ -885,6 +1028,7 @@ public class QpidTestCase extends TestCase } revertSystemProperties(); + revertLoggingLevels(); } /** @@ -961,7 +1105,12 @@ public class QpidTestCase extends TestCase public Message createNextMessage(Session session, int msgCount) throws JMSException { - return session.createMessage(); + Message message = session.createMessage(); + + message.setIntProperty(INDEX, msgCount); + + return message; + } public ConnectionURL getConnectionURL() throws NamingException diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes index f03d62667d..488ae24022 100644..100755 --- a/qpid/java/test-profiles/010Excludes +++ b/qpid/java/test-profiles/010Excludes @@ -92,3 +92,10 @@ org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#* // QPID-2084 : this test needs more work for 0-10 org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#* + +// QPID-2118 : 0-10 Java client has differrent error handling to 0-8 code path +org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError + +//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker) +org.apache.qpid.server.queue.ProducerFlowControlTest#* + diff --git a/qpid/java/test-profiles/08Excludes b/qpid/java/test-profiles/08Excludes index b277c6d929..6f3898384d 100644 --- a/qpid/java/test-profiles/08Excludes +++ b/qpid/java/test-profiles/08Excludes @@ -14,8 +14,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* // QPID-1823: this takes ages to run org.apache.qpid.client.SessionCreateTest#* -org.apache.qpid.test.client.RollbackOrderTest#* - // QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable org.apache.qpid.management.jmx.ManagementActorLoggingTest#* org.apache.qpid.server.queue.ModelTest#* diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes index de876e06bb..ee781fb80f 100644 --- a/qpid/java/test-profiles/08StandaloneExcludes +++ b/qpid/java/test-profiles/08StandaloneExcludes @@ -39,8 +39,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* // QPID-1823: this takes ages to run org.apache.qpid.client.SessionCreateTest#* -org.apache.qpid.test.client.RollbackOrderTest#* - // This test requires the standard configuration file for validation. // Excluding here does not reduce test coverage. org.apache.qpid.server.configuration.ServerConfigurationFileTest#* diff --git a/qpid/java/test-profiles/default.testprofile b/qpid/java/test-profiles/default.testprofile index 86a5b2efb3..28127e5c4c 100644 --- a/qpid/java/test-profiles/default.testprofile +++ b/qpid/java/test-profiles/default.testprofile @@ -11,11 +11,14 @@ max_prefetch=1000 log=debug amqj.logging.level=${log} +amqj.server.logging.level=${log} amqj.protocol.logging.level=${log} root.logging.level=warn log4j.configuration=file:///${test.profiles}/log4j-test.xml log4j.debug=false +# Note test-provider.properties also has variables of same name. +# Keep in sync test.port=15672 test.mport=18999 #Note : Management will start open second port on: mport + 100 : 19099 diff --git a/qpid/java/test-profiles/log4j-test.xml b/qpid/java/test-profiles/log4j-test.xml index 0aaa7d8686..2d77942a81 100644 --- a/qpid/java/test-profiles/log4j-test.xml +++ b/qpid/java/test-profiles/log4j-test.xml @@ -29,10 +29,10 @@ <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> <appender name="console" class="org.apache.log4j.ConsoleAppender"> <param name="Target" value="System.out"/> + <param name="ImmediateFlush" value="true"/> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%t %d %p [%c{4}] %m%n"/> </layout> - <param name="ImmediateFlush" value="true"/> </appender> <logger name="org.apache.qpid"> @@ -46,6 +46,10 @@ <logger name="org.apache.qpid.test.utils.QpidTestCase"> <level value="ALL"/> </logger> + + <logger name="org.apache.commons"> + <level value="WARN"/> + </logger> <root> <level value="${root.logging.level}"/> diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties index e5cb18da0b..a349b0fcbf 100644 --- a/qpid/java/test-profiles/test-provider.properties +++ b/qpid/java/test-profiles/test-provider.properties @@ -19,10 +19,14 @@ # # -test.port=5672 -test.port.ssl=5671 -test.port.alt=5772 -test.port.alt.ssl=5771 +# Copied from default.testprofile +test.port=15672 +test.mport=18999 +#Note : Java Management will start open second port on: mport + 100 : 19099 +test.port.ssl=15671 +test.port.alt=25672 +test.port.alt.ssl=25671 + connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}' connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1' |