diff options
Diffstat (limited to 'qpid/java/broker/src/main')
25 files changed, 1029 insertions, 602 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index add5e64ee8..644a33db01 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -27,13 +27,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; @@ -42,12 +41,7 @@ import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.UnauthorizedAccessException; +import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; @@ -59,6 +53,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; @@ -119,6 +114,11 @@ public class AMQChannel private final AMQProtocolSession _session; private boolean _closing; + private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + + private LogActor _actor; private LogSubject _logSubject; @@ -783,16 +783,32 @@ public class AMQChannel return _unacknowledgedMessageMap; } - + /** + * Called from the ChannelFlowHandler to suspend this Channel + * @param suspended boolean, should this Channel be suspended + */ public void setSuspended(boolean suspended) { - - boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { - _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started")); + // Log Flow Started before we start the subscriptions + if (!suspended) + { + _actor.message(_logSubject, ChannelMessages.CHN_1002("Started")); + } + + + // This section takes two different approaches to perform to perform + // the same function. Ensuring that the Subscription has taken note + // of the change in Channel State + // Here we have become unsuspended and so we ask each the queue to + // perform an Async delivery for each of the subscriptions in this + // Channel. The alternative would be to ensure that the subscription + // had received the change in suspension state. That way the logic + // behind decieding to start an async delivery was located with the + // Subscription. if (wasSuspended) { // may need to deliver queued messages @@ -801,6 +817,38 @@ public class AMQChannel s.getQueue().deliverAsync(s); } } + + + // Here we have become suspended so we need to ensure that each of + // the Subscriptions has noticed this change so that we can be sure + // they are not still sending messages. Again the code here is a + // very simplistic approach to ensure that the change of suspension + // has been noticed by each of the Subscriptions. Unlike the above + // case we don't actually need to do anything else. + if (!wasSuspended) + { + // may need to deliver queued messages + for (Subscription s : _tag2SubscriptionMap.values()) + { + try + { + s.getSendLock(); + } + finally + { + s.releaseSendLock(); + } + } + } + + + // Log Suspension only after we have confirmed all suspensions are + // stopped. + if (suspended) + { + _actor.message(_logSubject, ChannelMessages.CHN_1002("Stopped")); + } + } } @@ -931,4 +979,37 @@ public class AMQChannel { return _actor; } + + public void block(AMQQueue queue) + { + if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + { + + if(_blocking.compareAndSet(false,true)) + { + _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString())); + flow(false); + } + } + } + + public void unblock(AMQQueue queue) + { + if(_blockingQueues.remove(queue)) + { + if(_blocking.compareAndSet(true,false)) + { + _actor.message(_logSubject, ChannelMessages.CHN_1006()); + + flow(true); + } + } + } + + private void flow(boolean flow) + { + MethodRegistry methodRegistry = _session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); + _session.writeFrame(responseBody.generateFrame(_channelId)); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 74bb7ee969..5c73e353de 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -108,4 +108,14 @@ public class QueueConfiguration return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap()); } + public long getCapacity() + { + return _config.getLong("capacity", _vHostConfig.getCapacity()); + } + + public long getFlowResumeCapacity() + { + return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity()); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index a72c2889d1..641b44bb18 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -104,6 +104,8 @@ public class ServerConfiguration implements SignalHandler envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth"); envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize"); envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap"); + envVarMap.put("QPID_QUEUECAPACITY", "capacity"); + envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity"); envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer"); envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer"); envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay"); @@ -290,7 +292,6 @@ public class ServerConfiguration implements SignalHandler return conf; } - @Override public void handle(Signal arg0) { try @@ -508,6 +509,16 @@ public class ServerConfiguration implements SignalHandler return getConfig().getLong("minimumAlertRepeatGap", 0); } + public long getCapacity() + { + return getConfig().getLong("capacity", 0L); + } + + public long getFlowResumeCapacity() + { + return getConfig().getLong("flowResumeCapacity", getCapacity()); + } + public int getProcessors() { return getConfig().getInt("connector.processors", 4); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 0273a13262..6c72025ec2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -166,4 +166,15 @@ public class VirtualHostConfiguration return _config.getLong("queues.minimumAlertRepeatGap", 0); } + + public long getCapacity() + { + return _config.getLong("queues.capacity", 0l); + } + + public long getFlowResumeCapacity() + { + return _config.getLong("queues.flowResumeCapacity", getCapacity()); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java index 1541d3d892..9954719866 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java @@ -35,13 +35,11 @@ public class ConfigurationManagementMBean extends AMQManagedObject implements Co super(ConfigurationManagement.class, ConfigurationManagement.TYPE, ConfigurationManagement.VERSION); } - @Override public String getObjectInstanceName() { return ConfigurationManagement.TYPE; } - @Override public void reloadSecurityConfiguration() throws Exception { ApplicationRegistry.getInstance().getConfiguration().reparseConfigFile(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index c04b6c559c..620799a81f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -73,7 +73,6 @@ public class DefaultExchangeFactory implements ExchangeFactory return e; } - @Override public void initialise(VirtualHostConfiguration hostConfig) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index f3cab10ed7..fcf3fd4337 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); // throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); - } + } else { if (message.isQueueDeleted()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 5d7adc6371..9918013888 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -54,6 +54,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); + + // Protect the broker against out of order frame request. + if (virtualHost == null) + { + throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null); + } + final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java index 4502710dd6..0059a48c06 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java @@ -60,4 +60,9 @@ public abstract class AbstractActor implements LogActor return _rootLogger; } + public String toString() + { + return _logString; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java index 9b928accc0..5d2762fd1d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java @@ -36,4 +36,12 @@ public class BrokerActor extends AbstractActor _logString = "[Broker] "; } + + public BrokerActor(String name, RootMessageLogger logger) + { + super(logger); + + _logString = "[Broker(" + name + ")] "; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index 5ced7cc0b9..9169a1a651 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -249,12 +249,16 @@ CHN-1003 = Close # 0 - bytes allowed in prefetch # 1 - number of messagse. CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number} +CHN-1005 = Flow Control Enforced (Queue {0}) +CHN-1006 = Flow Control Removed #Queue # 0 - owner # 1 - priority QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}] QUE-1002 = Deleted +QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number} +QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number} #Exchange # 0 - type @@ -269,4 +273,4 @@ BND-1002 = Deleted #Subscription SUB-1001 = Create[ : Durable][ : Arguments : {0}] SUB-1002 = Close -SUB-1003 = State : {0}
\ No newline at end of file +SUB-1003 = State : {0} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 2a692344d0..184504717e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -160,6 +160,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void setMinimumAlertRepeatGap(long value); + long getCapacity(); + + void setCapacity(long capacity); + + + long getFlowResumeCapacity(); + + void setFlowResumeCapacity(long flowResumeCapacity); + + + void deleteMessageFromTop(StoreContext storeContext) throws AMQException; long clearQueue(StoreContext storeContext) throws AMQException; @@ -180,6 +191,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void stop(); + void checkCapacity(AMQChannel channel); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 7509350e65..267ccf43ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -26,11 +26,105 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Map; +import java.util.HashMap; + public class AMQQueueFactory { public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + private abstract static class QueueProperty + { + + private final AMQShortString _argumentName; + + + public QueueProperty(String argumentName) + { + _argumentName = new AMQShortString(argumentName); + } + + public AMQShortString getArgumentName() + { + return _argumentName; + } + + + public abstract void setPropertyValue(AMQQueue queue, Object value); + + } + + private abstract static class QueueLongProperty extends QueueProperty + { + + public QueueLongProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).longValue()); + } + + } + + abstract void setPropertyValue(AMQQueue queue, long value); + + + } + + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { + new QueueLongProperty("x-qpid-maximum-message-age") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageAge(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-size") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageSize(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-count") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageCount(value); + } + }, + new QueueLongProperty("x-qpid-minimum-alert-repeat-gap") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMinimumAlertRepeatGap(value); + } + }, + new QueueLongProperty("x-qpid-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setCapacity(value); + } + }, + new QueueLongProperty("x-qpid-flow-resume-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setFlowResumeCapacity(value); + } + } + + }; + + + public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, @@ -53,6 +147,18 @@ public class AMQQueueFactory //Register the new queue virtualHost.getQueueRegistry().registerQueue(q); q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString())); + + if(arguments != null) + { + for(QueueProperty p : DECLAREABLE_PROPERTIES) + { + if(arguments.containsKey(p.getArgumentName())) + { + p.setPropertyValue(q, arguments.get(p.getArgumentName())); + } + } + } + return q; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index dbad5438dc..3b58f05f93 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -42,8 +42,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; - + private final AMQMessage _message; private Set<Subscription> _rejectedBy = null; @@ -177,13 +176,21 @@ public class QueueEntryImpl implements QueueEntry public String debugIdentity() { - return getMessage().debugIdentity(); + AMQMessage message = getMessage(); + if (message == null) + { + return "null"; + } + else + { + return message.debugIdentity(); + } } public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return getMessage().immediateAndNotDelivered(); } public void setRedelivered(boolean b) @@ -385,4 +392,5 @@ public class QueueEntryImpl implements QueueEntry { return _queueEntryList; } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b14b92b014..d781dc4dea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1,11 +1,10 @@ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.AMQChannel; /* * @@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Executor _asyncDelivery; private final AtomicLong _totalMessagesReceived = new AtomicLong(); + private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); + /** max allowed size(KB) of a single message */ public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); @@ -122,6 +124,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private LogSubject _logSubject; private LogActor _logActor; + + private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); + private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); + private final AtomicBoolean _overfull = new AtomicBoolean(false); + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -508,8 +515,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { _deliveredMessages.incrementAndGet(); + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity()); + } sub.send(entry); - } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) @@ -626,6 +636,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new FailedDequeueException(_name.toString(), e); } + checkCapacity(); + } private void decrementQueueSize(final QueueEntry entry) @@ -1170,11 +1182,64 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void deliverAsync() + public void checkCapacity(AMQChannel channel) { - _stateChangeCount.incrementAndGet(); + if(_capacity != 0l) + { + if(_atomicQueueSize.get() > _capacity) + { + _overfull.set(true); + //Overfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity)); + + if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) + { + channel.block(this); + } + + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + + channel.unblock(this); + _blockedChannels.remove(channel); + + } + + } + + + + } + } + + private void checkCapacity() + { + if(_capacity != 0L) + { + if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) + { + if(_overfull.compareAndSet(true,false)) + {//Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + } - Runner runner = new Runner(); + + for(AMQChannel c : _blockedChannels.keySet()) + { + c.unblock(this); + _blockedChannels.remove(c); + } + } + } + } + + + public void deliverAsync() + { + Runner runner = new Runner(_stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1187,13 +1252,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(new SubFlushRunner(sub)); } + private class Runner implements ReadWriteRunnable { + String _name; + public Runner(long count) + { + _name = "QueueRunner-" + count + "-" + _logActor; + } + public void run() { + String originalName = Thread.currentThread().getName(); try { + Thread.currentThread().setName(_name); CurrentActor.set(_logActor); + processQueue(this); } catch (AMQException e) @@ -1203,9 +1278,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener finally { CurrentActor.remove(); + Thread.currentThread().setName(originalName); } - - } public boolean isRead() @@ -1217,6 +1291,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return true; } + + public String toString() + { + return _name; + } } private class SubFlushRunner implements ReadWriteRunnable @@ -1230,27 +1309,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void run() { - boolean complete = false; - try - { - CurrentActor.set(_sub.getLogActor()); - complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); - } - catch (AMQException e) - { - _logger.error(e); + String originalName = Thread.currentThread().getName(); + try{ + Thread.currentThread().setName("SubFlushRunner-"+_sub); + + boolean complete = false; + try + { + CurrentActor.set(_sub.getLogActor()); + complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); + + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + } + if (!complete && !_sub.isSuspended()) + { + _asyncDelivery.execute(this); + } } finally { - CurrentActor.remove(); - } - if (!complete && !_sub.isSuspended()) - { - _asyncDelivery.execute(this); + Thread.currentThread().setName(originalName); } - } public boolean isRead() @@ -1278,7 +1366,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { sub.getSendLock(); - atTail = attemptDelivery(sub); + atTail = attemptDelivery(sub); if (atTail && sub.isAutoClose()) { unregisterSubscription(sub); @@ -1308,63 +1396,81 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return atTail; } + /** + * Attempt delivery for the given subscription. + * + * Looks up the next node for the subscription and attempts to deliver it. + * + * @param sub + * @return true if we have completed all possible deliveries for this sub. + * @throws AMQException + */ private boolean attemptDelivery(Subscription sub) throws AMQException { boolean atTail = false; boolean advanced = false; - boolean subActive = sub.isActive(); + boolean subActive = sub.isActive() && !sub.isSuspended(); if (subActive) { QueueEntry node = moveSubscriptionToNextNode(sub); + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity()); + } if (!(node.isAcquired() || node.isDeleted())) { - if (!sub.isSuspended()) + if (sub.hasInterest(node)) { - if (sub.hasInterest(node)) + if (!sub.wouldSuspend(node)) { - if (!sub.wouldSuspend(node)) + if (!sub.isBrowser() && !node.acquire(sub)) { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - } - else + sub.restoreCredit(node); + } + else + { + deliverMessage(sub, node); + + if (sub.isBrowser()) { - deliverMessage(sub, node); + QueueEntry newNode = _entries.next(node); - if (sub.isBrowser()) + if (newNode != null) { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } + advanced = true; + sub.setLastSeenEntry(node, newNode); + node = sub.getLastSeenEntry(); } + } - - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub, node)); } + } - else + else // Not enough Credit for message and wouldSuspend { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } + //QPID-1187 - Treat the subscription as suspended for this message + // and wait for the message to be removed to continue delivery. + + // 2009-09-30 : MR : setting subActive = false only causes, this + // particular delivery attempt to end. This is called from + // flushSubscription and processQueue both of which attempt + // delivery a number of times. Won't a bytes limited + // subscriber with not enough credit for the next message + // create a lot of new QELs? How about a browser that calls + // this method LONG.MAX_LONG times! + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub, node)); + } + } + else + { + // this subscription is not interested in this node so we can skip over it + QueueEntry newNode = _entries.next(node); + if (newNode != null) + { + sub.setLastSeenEntry(node, newNode); } } - } atTail = (_entries.next(node) == null) && !advanced; } @@ -1409,6 +1515,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity())); + } + return node; } @@ -1423,6 +1535,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asynchronousRunner.compareAndSet(runner, null); + // For every message enqueue/requeue the we fire deliveryAsync() which + // increases _stateChangeCount. If _sCC changes whilst we are in our loop + // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) + // then we will continue to run for a maximum of iterations. + // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move @@ -1442,20 +1559,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //iterate over the subscribers and try to advance their pointer while (subscriptionIter.advance()) { - boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); sub.getSendLock(); try { - if (sub != null) - { - - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null) - { - done = attemptDelivery(sub); - } - } + done = attemptDelivery(sub); if (done) { if (extraLoops == 0) @@ -1492,11 +1600,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rescheduling runner:" + runner); + } _asyncDelivery.execute(runner); } } - @Override public void checkMessageStatus() throws AMQException { @@ -1604,6 +1715,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public long getCapacity() + { + return _capacity; + } + + public void setCapacity(long capacity) + { + _capacity = capacity; + } + + public long getFlowResumeCapacity() + { + return _flowResumeCapacity; + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + _flowResumeCapacity = flowResumeCapacity; + } + + public Set<NotificationCheck> getNotificationChecks() { return _notificationChecks; @@ -1673,6 +1805,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(config.getMaximumMessageSize()); setMaximumMessageCount(config.getMaximumMessageCount()); setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + _capacity = config.getCapacity(); + _flowResumeCapacity = config.getFlowResumeCapacity(); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 9575bfa1ec..1affdd6590 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -102,7 +102,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { - instance.initialise(); + instance.initialise(instanceID); } catch (Exception e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index a049d1eb09..831f928832 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -42,18 +42,22 @@ import java.io.File; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { + private String _registryName; public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { super(new ServerConfiguration(configurationURL)); } - public void initialise() throws Exception + public void initialise(int instanceID) throws Exception { _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger()); + + _registryName = String.valueOf(instanceID); + // Set the Actor for current log messages - CurrentActor.set(new BrokerActor(_rootMessageLogger)); + CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger)); CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion())); @@ -83,7 +87,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void close() throws Exception { //Set the Actor for Broker Shutdown - CurrentActor.set(new BrokerActor(_rootMessageLogger)); + CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger)); try { super.close(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index ddb3fce5d2..92accb3499 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -43,8 +43,9 @@ public interface IApplicationRegistry * Initialise the application registry. All initialisation must be done in this method so that any components * that need access to the application registry itself for initialisation are able to use it. Attempting to * initialise in the constructor will lead to failures since the registry reference will not have been set. + * @param instanceID the instanceID that we can use to identify this AR. */ - void initialise() throws Exception; + void initialise(int instanceID) throws Exception; /** * Shutdown this Registry diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java index f852514444..d2bbb795e9 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.server.security.access; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; -import org.apache.qpid.server.exchange.Exchange; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; public class PrincipalPermissions { @@ -41,6 +42,7 @@ public class PrincipalPermissions private static final Object CREATE_QUEUES_KEY = new Object(); private static final Object CREATE_EXCHANGES_KEY = new Object(); + private static final Object CREATE_QUEUE_TEMPORARY_KEY = new Object(); private static final Object CREATE_QUEUE_QUEUES_KEY = new Object(); private static final Object CREATE_QUEUE_EXCHANGES_KEY = new Object(); @@ -80,248 +82,257 @@ public class PrincipalPermissions { switch (permission) { - case ACCESS: - break; // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS - case BIND: - break; // All the details are currently included in the create setup. case CONSUME: // Parameters : AMQShortString queueName, Boolean Temporary, Boolean ownQueueOnly - Map consumeRights = (Map) _permissions.get(permission); - - if (consumeRights == null) - { - consumeRights = new ConcurrentHashMap(); - _permissions.put(permission, consumeRights); - } - - //if we have parametsre - if (parameters.length > 0) - { - AMQShortString queueName = (AMQShortString) parameters[0]; - Boolean temporary = (Boolean) parameters[1]; - Boolean ownQueueOnly = (Boolean) parameters[2]; - - if (temporary) - { - consumeRights.put(CONSUME_TEMPORARY_KEY, true); - } - else - { - consumeRights.put(CONSUME_TEMPORARY_KEY, false); - } - - if (ownQueueOnly) - { - consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true); - } - else - { - consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false); - } - - - LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY); - if (queues == null) - { - queues = new LinkedList(); - consumeRights.put(CONSUME_QUEUES_KEY, queues); - } - - if (queueName != null) - { - queues.add(queueName); - } - } - - + grantConsume(permission, parameters); break; case CREATEQUEUE: // Parameters : Boolean temporary, AMQShortString queueName // , AMQShortString exchangeName , AMQShortString routingKey - - Map createRights = (Map) _permissions.get(permission); - - if (createRights == null) - { - createRights = new ConcurrentHashMap(); - _permissions.put(permission, createRights); - - } - - //The existence of the empty map mean permission to all. - if (parameters.length == 0) - { - return; - } - - Boolean temporary = (Boolean) parameters[0]; - - AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null; - AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null; - //Set the routingkey to the specified value or the queueName if present - AMQShortString routingKey = (parameters.length > 3 && null != parameters[3]) ? (AMQShortString) parameters[3] : queueName; - - // Get the queues map - Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY); - - if (create_queues == null) - { - create_queues = new ConcurrentHashMap(); - createRights.put(CREATE_QUEUES_KEY, create_queues); - } - - //Allow all temp queues to be created - create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary); - - //Create empty list of queues - Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY); - - if (create_queues_queues == null) - { - create_queues_queues = new ConcurrentHashMap(); - create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues); - } - - // We are granting CREATE rights to all temporary queues only - if (parameters.length == 1) - { - return; - } - - // if we have a queueName then we need to store any associated exchange / rk bindings - if (queueName != null) - { - Map queue = (Map) create_queues_queues.get(queueName); - if (queue == null) - { - queue = new ConcurrentHashMap(); - create_queues_queues.put(queueName, queue); - } - - if (exchangeName != null) - { - queue.put(exchangeName, routingKey); - } - - //If no exchange is specified then the presence of the queueName in the map says any exchange is ok - } - - // Store the exchange that we are being granted rights to. This will be used as part of binding - - //Lookup the list of exchanges - Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY); - - if (create_queues_exchanges == null) - { - create_queues_exchanges = new ConcurrentHashMap(); - create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges); - } - - //if we have an exchange - if (exchangeName != null) - { - //Retrieve the list of permitted exchanges. - Map exchanges = (Map) create_queues_exchanges.get(exchangeName); - - if (exchanges == null) - { - exchanges = new ConcurrentHashMap(); - create_queues_exchanges.put(exchangeName, exchanges); - } - - //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY - exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary); - - //Store the binding details of queue/rk for this exchange. - if (queueName != null) - { - //Retrieve the list of permitted routingKeys. - Map rKeys = (Map) exchanges.get(exchangeName); - - if (rKeys == null) - { - rKeys = new ConcurrentHashMap(); - exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys); - } - - rKeys.put(queueName, routingKey); - } - } + grantCreateQueue(permission, parameters); break; case CREATEEXCHANGE: // Parameters AMQShortString exchangeName , AMQShortString Class - Map rights = (Map) _permissions.get(permission); - if (rights == null) - { - rights = new ConcurrentHashMap(); - _permissions.put(permission, rights); - } - - Map create_exchanges = (Map) rights.get(CREATE_EXCHANGES_KEY); - if (create_exchanges == null) - { - create_exchanges = new ConcurrentHashMap(); - rights.put(CREATE_EXCHANGES_KEY, create_exchanges); - } - - //Should perhaps error if parameters[0] is null; - AMQShortString name = parameters.length > 0 ? (AMQShortString) parameters[0] : null; - AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : new AMQShortString("direct"); - - //Store the exchangeName / class mapping if the mapping is null - rights.put(name, className); - break; - case DELETE: + grantCreateExchange(permission, parameters); break; - case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey - Map publishRights = (Map) _permissions.get(permission); - - if (publishRights == null) - { - publishRights = new ConcurrentHashMap(); - _permissions.put(permission, publishRights); - } - - if (parameters == null || parameters.length == 0) - { - //If we have no parameters then allow publish to all destinations - // this is signified by having a null value for publish_exchanges - } - else - { - Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY); - - if (publish_exchanges == null) - { - publish_exchanges = new ConcurrentHashMap(); - publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges); - } - - - HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]); - - // Check to see if we have a routing key - if (parameters.length == 2) - { - if (routingKeys == null) - { - routingKeys = new HashSet<AMQShortString>(); - } - //Add routing key to permitted publish destinations - routingKeys.add(parameters[1]); - } - - // Add the updated routingkey list or null if all values allowed - publish_exchanges.put(parameters[0], routingKeys); - } + grantPublish(permission, parameters); break; + /* The other cases just fall through to no-op */ + case DELETE: + case ACCESS: // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS + case BIND: // All the details are currently included in the create setup. case PURGE: - break; case UNBIND: break; } } + private void grantPublish(Permission permission, Object... parameters) { + Map publishRights = (Map) _permissions.get(permission); + + if (publishRights == null) + { + publishRights = new ConcurrentHashMap(); + _permissions.put(permission, publishRights); + } + + if (parameters == null || parameters.length == 0) + { + //If we have no parameters then allow publish to all destinations + // this is signified by having a null value for publish_exchanges + } + else + { + Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY); + + if (publish_exchanges == null) + { + publish_exchanges = new ConcurrentHashMap(); + publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges); + } + + + HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]); + + // Check to see if we have a routing key + if (parameters.length == 2) + { + if (routingKeys == null) + { + routingKeys = new HashSet<AMQShortString>(); + } + //Add routing key to permitted publish destinations + routingKeys.add(parameters[1]); + } + + // Add the updated routingkey list or null if all values allowed + publish_exchanges.put(parameters[0], routingKeys); + } + } + + private void grantCreateExchange(Permission permission, Object... parameters) { + Map rights = (Map) _permissions.get(permission); + if (rights == null) + { + rights = new ConcurrentHashMap(); + _permissions.put(permission, rights); + } + + Map create_exchanges = (Map) rights.get(CREATE_EXCHANGES_KEY); + if (create_exchanges == null) + { + create_exchanges = new ConcurrentHashMap(); + rights.put(CREATE_EXCHANGES_KEY, create_exchanges); + } + + //Should perhaps error if parameters[0] is null; + AMQShortString name = parameters.length > 0 ? (AMQShortString) parameters[0] : null; + AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : new AMQShortString("direct"); + + //Store the exchangeName / class mapping if the mapping is null + rights.put(name, className); + } + + private void grantCreateQueue(Permission permission, Object... parameters) { + Map createRights = (Map) _permissions.get(permission); + + if (createRights == null) + { + createRights = new ConcurrentHashMap(); + _permissions.put(permission, createRights); + + } + + //The existence of the empty map mean permission to all. + if (parameters.length == 0) + { + return; + } + + Boolean temporary = (Boolean) parameters[0]; + + AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null; + AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null; + //Set the routingkey to the specified value or the queueName if present + AMQShortString routingKey = (parameters.length > 3 && null != parameters[3]) ? (AMQShortString) parameters[3] : queueName; + + // Get the queues map + Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY); + + if (create_queues == null) + { + create_queues = new ConcurrentHashMap(); + createRights.put(CREATE_QUEUES_KEY, create_queues); + } + + //Allow all temp queues to be created + create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary); + + //Create empty list of queues + Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY); + + if (create_queues_queues == null) + { + create_queues_queues = new ConcurrentHashMap(); + create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues); + } + + // We are granting CREATE rights to all temporary queues only + if (parameters.length == 1) + { + return; + } + + // if we have a queueName then we need to store any associated exchange / rk bindings + if (queueName != null) + { + Map queue = (Map) create_queues_queues.get(queueName); + if (queue == null) + { + queue = new ConcurrentHashMap(); + create_queues_queues.put(queueName, queue); + } + + if (exchangeName != null) + { + queue.put(exchangeName, routingKey); + } + + //If no exchange is specified then the presence of the queueName in the map says any exchange is ok + } + + // Store the exchange that we are being granted rights to. This will be used as part of binding + + //Lookup the list of exchanges + Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY); + + if (create_queues_exchanges == null) + { + create_queues_exchanges = new ConcurrentHashMap(); + create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges); + } + + //if we have an exchange + if (exchangeName != null) + { + //Retrieve the list of permitted exchanges. + Map exchanges = (Map) create_queues_exchanges.get(exchangeName); + + if (exchanges == null) + { + exchanges = new ConcurrentHashMap(); + create_queues_exchanges.put(exchangeName, exchanges); + } + + //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY + exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary); + + //Store the binding details of queue/rk for this exchange. + if (queueName != null) + { + //Retrieve the list of permitted routingKeys. + Map rKeys = (Map) exchanges.get(exchangeName); + + if (rKeys == null) + { + rKeys = new ConcurrentHashMap(); + exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys); + } + + rKeys.put(queueName, routingKey); + } + } + } + + private void grantConsume(Permission permission, Object... parameters) { + Map consumeRights = (Map) _permissions.get(permission); + + if (consumeRights == null) + { + consumeRights = new ConcurrentHashMap(); + _permissions.put(permission, consumeRights); + } + + //if we have parametsre + if (parameters.length > 0) + { + AMQShortString queueName = (AMQShortString) parameters[0]; + Boolean temporary = (Boolean) parameters[1]; + Boolean ownQueueOnly = (Boolean) parameters[2]; + + if (temporary) + { + consumeRights.put(CONSUME_TEMPORARY_KEY, true); + } + else + { + consumeRights.put(CONSUME_TEMPORARY_KEY, false); + } + + if (ownQueueOnly) + { + consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true); + } + else + { + consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false); + } + + + LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY); + if (queues == null) + { + queues = new LinkedList(); + consumeRights.put(CONSUME_QUEUES_KEY, queues); + } + + if (queueName != null) + { + queues.add(queueName); + } + } + } + /** * * @param permission the type of permission to check @@ -346,267 +357,286 @@ public class PrincipalPermissions return AuthzResult.ALLOWED; // This is here for completeness but the SimpleXML ACLManager never calls it. // The existence of this user specific PP can be validated in the map SimpleXML maintains. case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey - - Exchange exchange = (Exchange) parameters[1]; - - AMQQueue bind_queueName = (AMQQueue) parameters[2]; - AMQShortString routingKey = (AMQShortString) parameters[3]; - - //Get all Create Rights for this user - Map bindCreateRights = (Map) _permissions.get(Permission.CREATEQUEUE); - - //Look up the Queue Creation Rights - Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY); - - //Lookup the list of queues - Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY); - - // Check and see if we have a queue white list to check - if (bind_create_queues_queues != null) - { - //There a white list for queues - Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName); - - if (exchangeDetails == null) //Then all queue can be bound to all exchanges. - { - return AuthzResult.ALLOWED; - } - - // Check to see if we have a white list of routingkeys to check - Map rkeys = (Map) exchangeDetails.get(exchange.getName()); - - // if keys is null then any rkey is allowed on this exchange - if (rkeys == null) - { - // There is no routingkey white list - return AuthzResult.ALLOWED; - } - else - { - // We have routingKeys so a match must be found to allowed binding - Iterator keys = rkeys.keySet().iterator(); - - boolean matched = false; - while (keys.hasNext() && !matched) - { - AMQShortString rkey = (AMQShortString) keys.next(); - if (rkey.endsWith("*")) - { - matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString()); - } - else - { - matched = routingKey.equals(rkey); - } - } - - - return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED; - } - - - } - else - { - //There a is no white list for queues - - // So can allow all queues to be bound - // but we should first check and see if we have a temp queue and validate that we are allowed - // to bind temp queues. - - //Check to see if we have a temporary queue - if (bind_queueName.isAutoDelete()) - { - // Check and see if we have an exchange white list. - Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY); - - // If the exchange exists then we must check to see if temporary queues are allowed here - if (bind_exchanges != null) - { - // Check to see if the requested exchange is allowed. - Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName()); - - return ((Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED; - } - - //no white list so all allowed, drop through to return true below. - } - - // not a temporary queue and no white list so all allowed. - return AuthzResult.ALLOWED; - } - + return authoriseBind(parameters); case CREATEQUEUE:// Parameters : boolean autodelete, AMQShortString name - - Map createRights = (Map) _permissions.get(permission); - - // If there are no create rights then deny request - if (createRights == null) - { - return AuthzResult.DENIED; - } - - //Look up the Queue Creation Rights - Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY); - - //Lookup the list of queues allowed to be created - Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY); - - - AMQShortString queueName = (AMQShortString) parameters[1]; - Boolean autoDelete = (Boolean) parameters[0]; - - if (autoDelete)// we have a temporary queue - { - return ((Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED; - } - else - { - // If there is a white list then check - if (create_queues_queues == null || create_queues_queues.containsKey(queueName)) - { - return AuthzResult.ALLOWED; - } - else - { - return AuthzResult.DENIED; - } - - } + return authoriseCreateQueue(permission, parameters); case CREATEEXCHANGE: - Map rights = (Map) _permissions.get(permission); - - AMQShortString exchangeName = (AMQShortString) parameters[0]; - - // If the exchange list is doesn't exist then all is allowed else - // check the valid exchanges - if (rights == null || rights.containsKey(exchangeName)) - { - return AuthzResult.ALLOWED; - } - else - { - return AuthzResult.DENIED; - } + return authoriseCreateExchange(permission, parameters); case CONSUME: // Parameters : AMQQueue - - if (parameters.length == 1 && parameters[0] instanceof AMQQueue) - { - AMQQueue queue = ((AMQQueue) parameters[0]); - Map queuePermissions = (Map) _permissions.get(permission); - - List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY); - - Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY); - Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY); - - // If user is allowed to publish to temporary queues and this is a temp queue then allow it. - if (temporayQueues) - { - if (queue.isAutoDelete()) - // This will allow consumption from any temporary queue including ones not owned by this user. - // Of course the exclusivity will not be broken. - { - // if not limited to ownQueuesOnly then ok else check queue Owner. - return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED; - } - else - { - return AuthzResult.DENIED; - } - } - - // if queues are white listed then ensure it is ok - if (queues != null) - { - // if no queues are listed then ALL are ok othereise it must be specified. - if (ownQueuesOnly) - { - if (queue.getOwner().equals(_user)) - { - return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED; - } - else - { - return AuthzResult.DENIED; - } - } - - // If we are - return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED; - } - } - - // Can't authenticate without the right parameters - return AuthzResult.DENIED; - case DELETE: - break; - + return authoriseConsume(permission, parameters); case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey - Map publishRights = (Map) _permissions.get(permission); - - if (publishRights == null) - { - return AuthzResult.DENIED; - } - - Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY); - - // Having no exchanges listed gives full publish rights to all exchanges - if (exchanges == null) - { - return AuthzResult.ALLOWED; - } - // Otherwise exchange must be listed in the white list - - // If the map doesn't have the exchange then it isn't allowed - if (!exchanges.containsKey(((Exchange) parameters[0]).getName())) - { - return AuthzResult.DENIED; - } - else - { - - // Get valid routing keys - HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName()); - - // Having no routingKeys in the map then all are allowed. - if (routingKeys == null) - { - return AuthzResult.ALLOWED; - } - else - { - // We have routingKeys so a match must be found to allowed binding - Iterator keys = routingKeys.iterator(); - - - AMQShortString publishRKey = (AMQShortString)parameters[1]; - - boolean matched = false; - while (keys.hasNext() && !matched) - { - AMQShortString rkey = (AMQShortString) keys.next(); - - if (rkey.endsWith("*")) - { - matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1)); - } - else - { - matched = publishRKey.equals(rkey); - } - } - return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED; - } - } + return authorisePublish(permission, parameters); + /* Fall through */ + case DELETE: case PURGE: - break; case UNBIND: - break; - + default: + return AuthzResult.DENIED; } - return AuthzResult.DENIED; } + + private AuthzResult authoriseConsume(Permission permission, Object... parameters) { + if (parameters.length == 1 && parameters[0] instanceof AMQQueue) + { + AMQQueue queue = ((AMQQueue) parameters[0]); + Map queuePermissions = (Map) _permissions.get(permission); + + if (queuePermissions == null) + { + //if the outer map is null, the user has no CONSUME rights at all + return AuthzResult.DENIED; + } + + List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY); + + Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY); + Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY); + + // If user is allowed to publish to temporary queues and this is a temp queue then allow it. + if (temporayQueues) + { + if (queue.isAutoDelete()) + // This will allow consumption from any temporary queue including ones not owned by this user. + // Of course the exclusivity will not be broken. + { + // if not limited to ownQueuesOnly then ok else check queue Owner. + return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED; + } + else + { + return AuthzResult.DENIED; + } + } + + // if queues are white listed then ensure it is ok + if (queues != null) + { + // if no queues are listed then ALL are ok othereise it must be specified. + if (ownQueuesOnly) + { + if (queue.getOwner().equals(_user)) + { + return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED; + } + else + { + return AuthzResult.DENIED; + } + } + + // If we are + return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED; + } + } + + // Can't authenticate without the right parameters + return AuthzResult.DENIED; + } + + private AuthzResult authorisePublish(Permission permission, Object... parameters) { + Map publishRights = (Map) _permissions.get(permission); + + if (publishRights == null) + { + return AuthzResult.DENIED; + } + + Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY); + + // Having no exchanges listed gives full publish rights to all exchanges + if (exchanges == null) + { + return AuthzResult.ALLOWED; + } + // Otherwise exchange must be listed in the white list + + // If the map doesn't have the exchange then it isn't allowed + if (!exchanges.containsKey(((Exchange) parameters[0]).getName())) + { + return AuthzResult.DENIED; + } + else + { + + // Get valid routing keys + HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName()); + + // Having no routingKeys in the map then all are allowed. + if (routingKeys == null) + { + return AuthzResult.ALLOWED; + } + else + { + // We have routingKeys so a match must be found to allowed binding + Iterator keys = routingKeys.iterator(); + + + AMQShortString publishRKey = (AMQShortString)parameters[1]; + + boolean matched = false; + while (keys.hasNext() && !matched) + { + AMQShortString rkey = (AMQShortString) keys.next(); + + if (rkey.endsWith("*")) + { + matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1)); + } + else + { + matched = publishRKey.equals(rkey); + } + } + return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED; + } + } + } + + private AuthzResult authoriseCreateExchange(Permission permission, Object... parameters) { + Map rights = (Map) _permissions.get(permission); + + AMQShortString exchangeName = (AMQShortString) parameters[0]; + + // If the exchange list is doesn't exist then all is allowed else + // check the valid exchanges + if (rights == null || rights.containsKey(exchangeName)) + { + return AuthzResult.ALLOWED; + } + else + { + return AuthzResult.DENIED; + } + } + + private AuthzResult authoriseCreateQueue(Permission permission, Object... parameters) { + Map createRights = (Map) _permissions.get(permission); + + // If there are no create rights then deny request + if (createRights == null) + { + return AuthzResult.DENIED; + } + + //Look up the Queue Creation Rights + Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY); + + //Lookup the list of queues allowed to be created + Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY); + + + AMQShortString queueName = (AMQShortString) parameters[1]; + Boolean autoDelete = (Boolean) parameters[0]; + + if (autoDelete)// we have a temporary queue + { + return ((Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED; + } + else + { + // If there is a white list then check + if (create_queues_queues == null || create_queues_queues.containsKey(queueName)) + { + return AuthzResult.ALLOWED; + } + else + { + return AuthzResult.DENIED; + } + + } + } + + private AuthzResult authoriseBind(Object... parameters) { + Exchange exchange = (Exchange) parameters[1]; + + AMQQueue bind_queueName = (AMQQueue) parameters[2]; + AMQShortString routingKey = (AMQShortString) parameters[3]; + + //Get all Create Rights for this user + Map bindCreateRights = (Map) _permissions.get(Permission.CREATEQUEUE); + + //Look up the Queue Creation Rights + Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY); + + //Lookup the list of queues + Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY); + + // Check and see if we have a queue white list to check + if (bind_create_queues_queues != null) + { + //There a white list for queues + Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName); + + if (exchangeDetails == null) //Then all queue can be bound to all exchanges. + { + return AuthzResult.ALLOWED; + } + + // Check to see if we have a white list of routingkeys to check + Map rkeys = (Map) exchangeDetails.get(exchange.getName()); + + // if keys is null then any rkey is allowed on this exchange + if (rkeys == null) + { + // There is no routingkey white list + return AuthzResult.ALLOWED; + } + else + { + // We have routingKeys so a match must be found to allowed binding + Iterator keys = rkeys.keySet().iterator(); + + boolean matched = false; + while (keys.hasNext() && !matched) + { + AMQShortString rkey = (AMQShortString) keys.next(); + if (rkey.endsWith("*")) + { + matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString()); + } + else + { + matched = routingKey.equals(rkey); + } + } + + + return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED; + } + + + } + else + { + //There a is no white list for queues + + // So can allow all queues to be bound + // but we should first check and see if we have a temp queue and validate that we are allowed + // to bind temp queues. + + //Check to see if we have a temporary queue + if (bind_queueName.isAutoDelete()) + { + // Check and see if we have an exchange white list. + Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY); + + // If the exchange exists then we must check to see if temporary queues are allowed here + if (bind_exchanges != null) + { + // Check to see if the requested exchange is allowed. + Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName()); + + return ((Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED; + } + + //no white list so all allowed, drop through to return true below. + } + + // not a temporary queue and no white list so all allowed. + return AuthzResult.ALLOWED; + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java index a6fae053c2..ca6b68dafa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java @@ -35,28 +35,24 @@ public abstract class BasicACLPlugin implements ACLPlugin // Returns true or false if the plugin should authorise or deny the request protected abstract AuthzResult getResult(); - @Override public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue, AMQShortString routingKey) { return getResult(); } - @Override public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost) { return getResult(); } - @Override public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue) { return getResult(); } - @Override public AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal, boolean nowait, AMQQueue queue) @@ -64,7 +60,6 @@ public abstract class BasicACLPlugin implements ACLPlugin return getResult(); } - @Override public AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable, AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, @@ -73,7 +68,6 @@ public abstract class BasicACLPlugin implements ACLPlugin return getResult(); } - @Override public AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, AMQShortString queue) @@ -81,19 +75,16 @@ public abstract class BasicACLPlugin implements ACLPlugin return getResult(); } - @Override public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue) { return getResult(); } - @Override public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange) { return getResult(); } - @Override public AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory, AMQShortString routingKey, Exchange e) @@ -101,20 +92,17 @@ public abstract class BasicACLPlugin implements ACLPlugin return getResult(); } - @Override public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue) { return getResult(); } - @Override public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey, AMQQueue queue) { return getResult(); } - @Override public void setConfiguration(Configuration config) { // no-op diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java index 3a81932123..7450322130 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java @@ -211,7 +211,6 @@ public class FirewallPlugin extends AbstractACLPlugin } - @Override public void setConfiguration(Configuration config) throws ConfigurationException { // Get default action diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java index 4efe381a8b..8658101cd8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java @@ -42,7 +42,6 @@ public class PropertiesPrincipalDatabaseManager implements PrincipalDatabaseMana return _databases; } - @Override public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException { //todo diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 72d6afc65c..2893e916cc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -630,11 +630,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public QueueEntry getLastSeenEntry() { - return _queueContext.get(); + QueueEntry entry = _queueContext.get(); + + if(_logger.isDebugEnabled()) + { + _logger.debug(_logActor + ": lastSeenEntry: " + (entry == null ? "null" : entry.debugIdentity())); + } + + return entry; } public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue) { + _logger.debug(debugIdentity() + " Setting Last Seen To:" + (newvalue == null ? "nullNV" : newvalue.debugIdentity())); return _queueContext.compareAndSet(expected,newvalue); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 3c71282c57..450852cef7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -97,6 +97,7 @@ public class LocalTransactionalContext implements TransactionalContext try { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); + _queue.checkCapacity(_channel); if(entry.immediateAndNotDelivered()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 28af36e3db..10d6021d27 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -91,6 +91,8 @@ public class NonTransactionalContext implements TransactionalContext public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException { QueueEntry entry = queue.enqueue(_storeContext, message); + queue.checkCapacity(_channel); + //following check implements the functionality //required by the 'immediate' flag: |