summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java109
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java106
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java278
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java1002
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java2
25 files changed, 1029 insertions, 602 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index add5e64ee8..644a33db01 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -27,13 +27,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -42,12 +41,7 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -59,6 +53,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
@@ -119,6 +114,11 @@ public class AMQChannel
private final AMQProtocolSession _session;
private boolean _closing;
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+
+
private LogActor _actor;
private LogSubject _logSubject;
@@ -783,16 +783,32 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
-
+ /**
+ * Called from the ChannelFlowHandler to suspend this Channel
+ * @param suspended boolean, should this Channel be suspended
+ */
public void setSuspended(boolean suspended)
{
-
-
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
- _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+ // Log Flow Started before we start the subscriptions
+ if (!suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
+ }
+
+
+ // This section takes two different approaches to perform to perform
+ // the same function. Ensuring that the Subscription has taken note
+ // of the change in Channel State
+ // Here we have become unsuspended and so we ask each the queue to
+ // perform an Async delivery for each of the subscriptions in this
+ // Channel. The alternative would be to ensure that the subscription
+ // had received the change in suspension state. That way the logic
+ // behind decieding to start an async delivery was located with the
+ // Subscription.
if (wasSuspended)
{
// may need to deliver queued messages
@@ -801,6 +817,38 @@ public class AMQChannel
s.getQueue().deliverAsync(s);
}
}
+
+
+ // Here we have become suspended so we need to ensure that each of
+ // the Subscriptions has noticed this change so that we can be sure
+ // they are not still sending messages. Again the code here is a
+ // very simplistic approach to ensure that the change of suspension
+ // has been noticed by each of the Subscriptions. Unlike the above
+ // case we don't actually need to do anything else.
+ if (!wasSuspended)
+ {
+ // may need to deliver queued messages
+ for (Subscription s : _tag2SubscriptionMap.values())
+ {
+ try
+ {
+ s.getSendLock();
+ }
+ finally
+ {
+ s.releaseSendLock();
+ }
+ }
+ }
+
+
+ // Log Suspension only after we have confirmed all suspensions are
+ // stopped.
+ if (suspended)
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1002("Stopped"));
+ }
+
}
}
@@ -931,4 +979,37 @@ public class AMQChannel
{
return _actor;
}
+
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString()));
+ flow(false);
+ }
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue))
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1006());
+
+ flow(true);
+ }
+ }
+ }
+
+ private void flow(boolean flow)
+ {
+ MethodRegistry methodRegistry = _session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
+ _session.writeFrame(responseBody.generateFrame(_channelId));
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 74bb7ee969..5c73e353de 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -108,4 +108,14 @@ public class QueueConfiguration
return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap());
}
+ public long getCapacity()
+ {
+ return _config.getLong("capacity", _vHostConfig.getCapacity());
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index a72c2889d1..641b44bb18 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -104,6 +104,8 @@ public class ServerConfiguration implements SignalHandler
envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth");
envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize");
envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap");
+ envVarMap.put("QPID_QUEUECAPACITY", "capacity");
+ envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity");
envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer");
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
@@ -290,7 +292,6 @@ public class ServerConfiguration implements SignalHandler
return conf;
}
- @Override
public void handle(Signal arg0)
{
try
@@ -508,6 +509,16 @@ public class ServerConfiguration implements SignalHandler
return getConfig().getLong("minimumAlertRepeatGap", 0);
}
+ public long getCapacity()
+ {
+ return getConfig().getLong("capacity", 0L);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return getConfig().getLong("flowResumeCapacity", getCapacity());
+ }
+
public int getProcessors()
{
return getConfig().getInt("connector.processors", 4);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 0273a13262..6c72025ec2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -166,4 +166,15 @@ public class VirtualHostConfiguration
return _config.getLong("queues.minimumAlertRepeatGap", 0);
}
+
+ public long getCapacity()
+ {
+ return _config.getLong("queues.capacity", 0l);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("queues.flowResumeCapacity", getCapacity());
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java
index 1541d3d892..9954719866 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagementMBean.java
@@ -35,13 +35,11 @@ public class ConfigurationManagementMBean extends AMQManagedObject implements Co
super(ConfigurationManagement.class, ConfigurationManagement.TYPE, ConfigurationManagement.VERSION);
}
- @Override
public String getObjectInstanceName()
{
return ConfigurationManagement.TYPE;
}
- @Override
public void reloadSecurityConfiguration() throws Exception
{
ApplicationRegistry.getInstance().getConfiguration().reparseConfigFile();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index c04b6c559c..620799a81f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -73,7 +73,6 @@ public class DefaultExchangeFactory implements ExchangeFactory
return e;
}
- @Override
public void initialise(VirtualHostConfiguration hostConfig)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index f3cab10ed7..fcf3fd4337 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
- }
+ }
else
{
if (message.isQueueDeleted())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 5d7adc6371..9918013888 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,6 +54,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+
final AMQChannel channel = new AMQChannel(session,channelId,
virtualHost.getMessageStore());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
index 4502710dd6..0059a48c06 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
@@ -60,4 +60,9 @@ public abstract class AbstractActor implements LogActor
return _rootLogger;
}
+ public String toString()
+ {
+ return _logString;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
index 9b928accc0..5d2762fd1d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
@@ -36,4 +36,12 @@ public class BrokerActor extends AbstractActor
_logString = "[Broker] ";
}
+
+ public BrokerActor(String name, RootMessageLogger logger)
+ {
+ super(logger);
+
+ _logString = "[Broker(" + name + ")] ";
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index 5ced7cc0b9..9169a1a651 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -249,12 +249,16 @@ CHN-1003 = Close
# 0 - bytes allowed in prefetch
# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
#Queue
# 0 - owner
# 1 - priority
QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
#Exchange
# 0 - type
@@ -269,4 +273,4 @@ BND-1002 = Deleted
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
-SUB-1003 = State : {0} \ No newline at end of file
+SUB-1003 = State : {0}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 2a692344d0..184504717e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -160,6 +160,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void setMinimumAlertRepeatGap(long value);
+ long getCapacity();
+
+ void setCapacity(long capacity);
+
+
+ long getFlowResumeCapacity();
+
+ void setFlowResumeCapacity(long flowResumeCapacity);
+
+
+
void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
long clearQueue(StoreContext storeContext) throws AMQException;
@@ -180,6 +191,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void stop();
+ void checkCapacity(AMQChannel channel);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 7509350e65..267ccf43ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -26,11 +26,105 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+import java.util.HashMap;
+
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ private abstract static class QueueProperty
+ {
+
+ private final AMQShortString _argumentName;
+
+
+ public QueueProperty(String argumentName)
+ {
+ _argumentName = new AMQShortString(argumentName);
+ }
+
+ public AMQShortString getArgumentName()
+ {
+ return _argumentName;
+ }
+
+
+ public abstract void setPropertyValue(AMQQueue queue, Object value);
+
+ }
+
+ private abstract static class QueueLongProperty extends QueueProperty
+ {
+
+ public QueueLongProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).longValue());
+ }
+
+ }
+
+ abstract void setPropertyValue(AMQQueue queue, long value);
+
+
+ }
+
+ private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
+ new QueueLongProperty("x-qpid-maximum-message-age")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageAge(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-size")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageSize(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-count")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageCount(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMinimumAlertRepeatGap(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setCapacity(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-flow-resume-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setFlowResumeCapacity(value);
+ }
+ }
+
+ };
+
+
+
public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
@@ -53,6 +147,18 @@ public class AMQQueueFactory
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+
+ if(arguments != null)
+ {
+ for(QueueProperty p : DECLAREABLE_PROPERTIES)
+ {
+ if(arguments.containsKey(p.getArgumentName()))
+ {
+ p.setPropertyValue(q, arguments.get(p.getArgumentName()));
+ }
+ }
+ }
+
return q;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index dbad5438dc..3b58f05f93 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -42,8 +42,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
-
+ private final AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
@@ -177,13 +176,21 @@ public class QueueEntryImpl implements QueueEntry
public String debugIdentity()
{
- return getMessage().debugIdentity();
+ AMQMessage message = getMessage();
+ if (message == null)
+ {
+ return "null";
+ }
+ else
+ {
+ return message.debugIdentity();
+ }
}
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return getMessage().immediateAndNotDelivered();
}
public void setRedelivered(boolean b)
@@ -385,4 +392,5 @@ public class QueueEntryImpl implements QueueEntry
{
return _queueEntryList;
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b14b92b014..d781dc4dea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1,11 +1,10 @@
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.AMQChannel;
/*
*
@@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final Executor _asyncDelivery;
private final AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
/** max allowed size(KB) of a single message */
public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -122,6 +124,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private LogSubject _logSubject;
private LogActor _logActor;
+
+ private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+ private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+ private final AtomicBoolean _overfull = new AtomicBoolean(false);
+
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -508,8 +515,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
_deliveredMessages.incrementAndGet();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
+ }
sub.send(entry);
-
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
@@ -626,6 +636,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new FailedDequeueException(_name.toString(), e);
}
+ checkCapacity();
+
}
private void decrementQueueSize(final QueueEntry entry)
@@ -1170,11 +1182,64 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- public void deliverAsync()
+ public void checkCapacity(AMQChannel channel)
{
- _stateChangeCount.incrementAndGet();
+ if(_capacity != 0l)
+ {
+ if(_atomicQueueSize.get() > _capacity)
+ {
+ _overfull.set(true);
+ //Overfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
+
+ if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
+ {
+ channel.block(this);
+ }
+
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+ channel.unblock(this);
+ _blockedChannels.remove(channel);
+
+ }
+
+ }
+
+
+
+ }
+ }
+
+ private void checkCapacity()
+ {
+ if(_capacity != 0L)
+ {
+ if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+ if(_overfull.compareAndSet(true,false))
+ {//Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+ }
- Runner runner = new Runner();
+
+ for(AMQChannel c : _blockedChannels.keySet())
+ {
+ c.unblock(this);
+ _blockedChannels.remove(c);
+ }
+ }
+ }
+ }
+
+
+ public void deliverAsync()
+ {
+ Runner runner = new Runner(_stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1187,13 +1252,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(new SubFlushRunner(sub));
}
+
private class Runner implements ReadWriteRunnable
{
+ String _name;
+ public Runner(long count)
+ {
+ _name = "QueueRunner-" + count + "-" + _logActor;
+ }
+
public void run()
{
+ String originalName = Thread.currentThread().getName();
try
{
+ Thread.currentThread().setName(_name);
CurrentActor.set(_logActor);
+
processQueue(this);
}
catch (AMQException e)
@@ -1203,9 +1278,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
finally
{
CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
}
-
-
}
public boolean isRead()
@@ -1217,6 +1291,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return true;
}
+
+ public String toString()
+ {
+ return _name;
+ }
}
private class SubFlushRunner implements ReadWriteRunnable
@@ -1230,27 +1309,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void run()
{
- boolean complete = false;
- try
- {
- CurrentActor.set(_sub.getLogActor());
- complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
- }
- catch (AMQException e)
- {
- _logger.error(e);
+ String originalName = Thread.currentThread().getName();
+ try{
+ Thread.currentThread().setName("SubFlushRunner-"+_sub);
+
+ boolean complete = false;
+ try
+ {
+ CurrentActor.set(_sub.getLogActor());
+ complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ if (!complete && !_sub.isSuspended())
+ {
+ _asyncDelivery.execute(this);
+ }
}
finally
{
- CurrentActor.remove();
- }
- if (!complete && !_sub.isSuspended())
- {
- _asyncDelivery.execute(this);
+ Thread.currentThread().setName(originalName);
}
-
}
public boolean isRead()
@@ -1278,7 +1366,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
sub.getSendLock();
- atTail = attemptDelivery(sub);
+ atTail = attemptDelivery(sub);
if (atTail && sub.isAutoClose())
{
unregisterSubscription(sub);
@@ -1308,63 +1396,81 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return atTail;
}
+ /**
+ * Attempt delivery for the given subscription.
+ *
+ * Looks up the next node for the subscription and attempts to deliver it.
+ *
+ * @param sub
+ * @return true if we have completed all possible deliveries for this sub.
+ * @throws AMQException
+ */
private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
boolean advanced = false;
- boolean subActive = sub.isActive();
+ boolean subActive = sub.isActive() && !sub.isSuspended();
if (subActive)
{
QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity());
+ }
if (!(node.isAcquired() || node.isDeleted()))
{
- if (!sub.isSuspended())
+ if (sub.hasInterest(node))
{
- if (sub.hasInterest(node))
+ if (!sub.wouldSuspend(node))
{
- if (!sub.wouldSuspend(node))
+ if (!sub.isBrowser() && !node.acquire(sub))
{
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
- }
- else
+ sub.restoreCredit(node);
+ }
+ else
+ {
+ deliverMessage(sub, node);
+
+ if (sub.isBrowser())
{
- deliverMessage(sub, node);
+ QueueEntry newNode = _entries.next(node);
- if (sub.isBrowser())
+ if (newNode != null)
{
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
+ advanced = true;
+ sub.setLastSeenEntry(node, newNode);
+ node = sub.getLastSeenEntry();
}
+
}
-
- }
- else // Not enough Credit for message and wouldSuspend
- {
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
- node.addStateChangeListener(new QueueEntryListener(sub, node));
}
+
}
- else
+ else // Not enough Credit for message and wouldSuspend
{
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
+ //QPID-1187 - Treat the subscription as suspended for this message
+ // and wait for the message to be removed to continue delivery.
+
+ // 2009-09-30 : MR : setting subActive = false only causes, this
+ // particular delivery attempt to end. This is called from
+ // flushSubscription and processQueue both of which attempt
+ // delivery a number of times. Won't a bytes limited
+ // subscriber with not enough credit for the next message
+ // create a lot of new QELs? How about a browser that calls
+ // this method LONG.MAX_LONG times!
+ subActive = false;
+ node.addStateChangeListener(new QueueEntryListener(sub, node));
+ }
+ }
+ else
+ {
+ // this subscription is not interested in this node so we can skip over it
+ QueueEntry newNode = _entries.next(node);
+ if (newNode != null)
+ {
+ sub.setLastSeenEntry(node, newNode);
}
}
-
}
atTail = (_entries.next(node) == null) && !advanced;
}
@@ -1409,6 +1515,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity()));
+ }
+
return node;
}
@@ -1423,6 +1535,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asynchronousRunner.compareAndSet(runner, null);
+ // For every message enqueue/requeue the we fire deliveryAsync() which
+ // increases _stateChangeCount. If _sCC changes whilst we are in our loop
+ // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
+ // then we will continue to run for a maximum of iterations.
+ // So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
@@ -1442,20 +1559,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//iterate over the subscribers and try to advance their pointer
while (subscriptionIter.advance())
{
- boolean closeConsumer = false;
Subscription sub = subscriptionIter.getNode().getSubscription();
sub.getSendLock();
try
{
- if (sub != null)
- {
-
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null)
- {
- done = attemptDelivery(sub);
- }
- }
+ done = attemptDelivery(sub);
if (done)
{
if (extraLoops == 0)
@@ -1492,11 +1600,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rescheduling runner:" + runner);
+ }
_asyncDelivery.execute(runner);
}
}
- @Override
public void checkMessageStatus() throws AMQException
{
@@ -1604,6 +1715,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public long getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(long capacity)
+ {
+ _capacity = capacity;
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _flowResumeCapacity;
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ _flowResumeCapacity = flowResumeCapacity;
+ }
+
+
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
@@ -1673,6 +1805,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setMaximumMessageSize(config.getMaximumMessageSize());
setMaximumMessageCount(config.getMaximumMessageCount());
setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 9575bfa1ec..1affdd6590 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -102,7 +102,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
- instance.initialise();
+ instance.initialise(instanceID);
}
catch (Exception e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index a049d1eb09..831f928832 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -42,18 +42,22 @@ import java.io.File;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
+ private String _registryName;
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
super(new ServerConfiguration(configurationURL));
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
_rootMessageLogger = new RootMessageLoggerImpl(_configuration,
new Log4jMessageLogger());
+
+ _registryName = String.valueOf(instanceID);
+
// Set the Actor for current log messages
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
+ CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));
@@ -83,7 +87,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void close() throws Exception
{
//Set the Actor for Broker Shutdown
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
+ CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
try
{
super.close();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index ddb3fce5d2..92accb3499 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -43,8 +43,9 @@ public interface IApplicationRegistry
* Initialise the application registry. All initialisation must be done in this method so that any components
* that need access to the application registry itself for initialisation are able to use it. Attempting to
* initialise in the constructor will lead to failures since the registry reference will not have been set.
+ * @param instanceID the instanceID that we can use to identify this AR.
*/
- void initialise() throws Exception;
+ void initialise(int instanceID) throws Exception;
/**
* Shutdown this Registry
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
index f852514444..d2bbb795e9 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.server.security.access;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.exchange.Exchange;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
public class PrincipalPermissions
{
@@ -41,6 +42,7 @@ public class PrincipalPermissions
private static final Object CREATE_QUEUES_KEY = new Object();
private static final Object CREATE_EXCHANGES_KEY = new Object();
+
private static final Object CREATE_QUEUE_TEMPORARY_KEY = new Object();
private static final Object CREATE_QUEUE_QUEUES_KEY = new Object();
private static final Object CREATE_QUEUE_EXCHANGES_KEY = new Object();
@@ -80,248 +82,257 @@ public class PrincipalPermissions
{
switch (permission)
{
- case ACCESS:
- break; // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS
- case BIND:
- break; // All the details are currently included in the create setup.
case CONSUME: // Parameters : AMQShortString queueName, Boolean Temporary, Boolean ownQueueOnly
- Map consumeRights = (Map) _permissions.get(permission);
-
- if (consumeRights == null)
- {
- consumeRights = new ConcurrentHashMap();
- _permissions.put(permission, consumeRights);
- }
-
- //if we have parametsre
- if (parameters.length > 0)
- {
- AMQShortString queueName = (AMQShortString) parameters[0];
- Boolean temporary = (Boolean) parameters[1];
- Boolean ownQueueOnly = (Boolean) parameters[2];
-
- if (temporary)
- {
- consumeRights.put(CONSUME_TEMPORARY_KEY, true);
- }
- else
- {
- consumeRights.put(CONSUME_TEMPORARY_KEY, false);
- }
-
- if (ownQueueOnly)
- {
- consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true);
- }
- else
- {
- consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false);
- }
-
-
- LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY);
- if (queues == null)
- {
- queues = new LinkedList();
- consumeRights.put(CONSUME_QUEUES_KEY, queues);
- }
-
- if (queueName != null)
- {
- queues.add(queueName);
- }
- }
-
-
+ grantConsume(permission, parameters);
break;
case CREATEQUEUE: // Parameters : Boolean temporary, AMQShortString queueName
// , AMQShortString exchangeName , AMQShortString routingKey
-
- Map createRights = (Map) _permissions.get(permission);
-
- if (createRights == null)
- {
- createRights = new ConcurrentHashMap();
- _permissions.put(permission, createRights);
-
- }
-
- //The existence of the empty map mean permission to all.
- if (parameters.length == 0)
- {
- return;
- }
-
- Boolean temporary = (Boolean) parameters[0];
-
- AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
- AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null;
- //Set the routingkey to the specified value or the queueName if present
- AMQShortString routingKey = (parameters.length > 3 && null != parameters[3]) ? (AMQShortString) parameters[3] : queueName;
-
- // Get the queues map
- Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
-
- if (create_queues == null)
- {
- create_queues = new ConcurrentHashMap();
- createRights.put(CREATE_QUEUES_KEY, create_queues);
- }
-
- //Allow all temp queues to be created
- create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary);
-
- //Create empty list of queues
- Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
-
- if (create_queues_queues == null)
- {
- create_queues_queues = new ConcurrentHashMap();
- create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues);
- }
-
- // We are granting CREATE rights to all temporary queues only
- if (parameters.length == 1)
- {
- return;
- }
-
- // if we have a queueName then we need to store any associated exchange / rk bindings
- if (queueName != null)
- {
- Map queue = (Map) create_queues_queues.get(queueName);
- if (queue == null)
- {
- queue = new ConcurrentHashMap();
- create_queues_queues.put(queueName, queue);
- }
-
- if (exchangeName != null)
- {
- queue.put(exchangeName, routingKey);
- }
-
- //If no exchange is specified then the presence of the queueName in the map says any exchange is ok
- }
-
- // Store the exchange that we are being granted rights to. This will be used as part of binding
-
- //Lookup the list of exchanges
- Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
-
- if (create_queues_exchanges == null)
- {
- create_queues_exchanges = new ConcurrentHashMap();
- create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges);
- }
-
- //if we have an exchange
- if (exchangeName != null)
- {
- //Retrieve the list of permitted exchanges.
- Map exchanges = (Map) create_queues_exchanges.get(exchangeName);
-
- if (exchanges == null)
- {
- exchanges = new ConcurrentHashMap();
- create_queues_exchanges.put(exchangeName, exchanges);
- }
-
- //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY
- exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary);
-
- //Store the binding details of queue/rk for this exchange.
- if (queueName != null)
- {
- //Retrieve the list of permitted routingKeys.
- Map rKeys = (Map) exchanges.get(exchangeName);
-
- if (rKeys == null)
- {
- rKeys = new ConcurrentHashMap();
- exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys);
- }
-
- rKeys.put(queueName, routingKey);
- }
- }
+ grantCreateQueue(permission, parameters);
break;
case CREATEEXCHANGE:
// Parameters AMQShortString exchangeName , AMQShortString Class
- Map rights = (Map) _permissions.get(permission);
- if (rights == null)
- {
- rights = new ConcurrentHashMap();
- _permissions.put(permission, rights);
- }
-
- Map create_exchanges = (Map) rights.get(CREATE_EXCHANGES_KEY);
- if (create_exchanges == null)
- {
- create_exchanges = new ConcurrentHashMap();
- rights.put(CREATE_EXCHANGES_KEY, create_exchanges);
- }
-
- //Should perhaps error if parameters[0] is null;
- AMQShortString name = parameters.length > 0 ? (AMQShortString) parameters[0] : null;
- AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : new AMQShortString("direct");
-
- //Store the exchangeName / class mapping if the mapping is null
- rights.put(name, className);
- break;
- case DELETE:
+ grantCreateExchange(permission, parameters);
break;
-
case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
- Map publishRights = (Map) _permissions.get(permission);
-
- if (publishRights == null)
- {
- publishRights = new ConcurrentHashMap();
- _permissions.put(permission, publishRights);
- }
-
- if (parameters == null || parameters.length == 0)
- {
- //If we have no parameters then allow publish to all destinations
- // this is signified by having a null value for publish_exchanges
- }
- else
- {
- Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
-
- if (publish_exchanges == null)
- {
- publish_exchanges = new ConcurrentHashMap();
- publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges);
- }
-
-
- HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]);
-
- // Check to see if we have a routing key
- if (parameters.length == 2)
- {
- if (routingKeys == null)
- {
- routingKeys = new HashSet<AMQShortString>();
- }
- //Add routing key to permitted publish destinations
- routingKeys.add(parameters[1]);
- }
-
- // Add the updated routingkey list or null if all values allowed
- publish_exchanges.put(parameters[0], routingKeys);
- }
+ grantPublish(permission, parameters);
break;
+ /* The other cases just fall through to no-op */
+ case DELETE:
+ case ACCESS: // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS
+ case BIND: // All the details are currently included in the create setup.
case PURGE:
- break;
case UNBIND:
break;
}
}
+ private void grantPublish(Permission permission, Object... parameters) {
+ Map publishRights = (Map) _permissions.get(permission);
+
+ if (publishRights == null)
+ {
+ publishRights = new ConcurrentHashMap();
+ _permissions.put(permission, publishRights);
+ }
+
+ if (parameters == null || parameters.length == 0)
+ {
+ //If we have no parameters then allow publish to all destinations
+ // this is signified by having a null value for publish_exchanges
+ }
+ else
+ {
+ Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+ if (publish_exchanges == null)
+ {
+ publish_exchanges = new ConcurrentHashMap();
+ publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges);
+ }
+
+
+ HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]);
+
+ // Check to see if we have a routing key
+ if (parameters.length == 2)
+ {
+ if (routingKeys == null)
+ {
+ routingKeys = new HashSet<AMQShortString>();
+ }
+ //Add routing key to permitted publish destinations
+ routingKeys.add(parameters[1]);
+ }
+
+ // Add the updated routingkey list or null if all values allowed
+ publish_exchanges.put(parameters[0], routingKeys);
+ }
+ }
+
+ private void grantCreateExchange(Permission permission, Object... parameters) {
+ Map rights = (Map) _permissions.get(permission);
+ if (rights == null)
+ {
+ rights = new ConcurrentHashMap();
+ _permissions.put(permission, rights);
+ }
+
+ Map create_exchanges = (Map) rights.get(CREATE_EXCHANGES_KEY);
+ if (create_exchanges == null)
+ {
+ create_exchanges = new ConcurrentHashMap();
+ rights.put(CREATE_EXCHANGES_KEY, create_exchanges);
+ }
+
+ //Should perhaps error if parameters[0] is null;
+ AMQShortString name = parameters.length > 0 ? (AMQShortString) parameters[0] : null;
+ AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : new AMQShortString("direct");
+
+ //Store the exchangeName / class mapping if the mapping is null
+ rights.put(name, className);
+ }
+
+ private void grantCreateQueue(Permission permission, Object... parameters) {
+ Map createRights = (Map) _permissions.get(permission);
+
+ if (createRights == null)
+ {
+ createRights = new ConcurrentHashMap();
+ _permissions.put(permission, createRights);
+
+ }
+
+ //The existence of the empty map mean permission to all.
+ if (parameters.length == 0)
+ {
+ return;
+ }
+
+ Boolean temporary = (Boolean) parameters[0];
+
+ AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
+ AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null;
+ //Set the routingkey to the specified value or the queueName if present
+ AMQShortString routingKey = (parameters.length > 3 && null != parameters[3]) ? (AMQShortString) parameters[3] : queueName;
+
+ // Get the queues map
+ Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+ if (create_queues == null)
+ {
+ create_queues = new ConcurrentHashMap();
+ createRights.put(CREATE_QUEUES_KEY, create_queues);
+ }
+
+ //Allow all temp queues to be created
+ create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary);
+
+ //Create empty list of queues
+ Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+ if (create_queues_queues == null)
+ {
+ create_queues_queues = new ConcurrentHashMap();
+ create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues);
+ }
+
+ // We are granting CREATE rights to all temporary queues only
+ if (parameters.length == 1)
+ {
+ return;
+ }
+
+ // if we have a queueName then we need to store any associated exchange / rk bindings
+ if (queueName != null)
+ {
+ Map queue = (Map) create_queues_queues.get(queueName);
+ if (queue == null)
+ {
+ queue = new ConcurrentHashMap();
+ create_queues_queues.put(queueName, queue);
+ }
+
+ if (exchangeName != null)
+ {
+ queue.put(exchangeName, routingKey);
+ }
+
+ //If no exchange is specified then the presence of the queueName in the map says any exchange is ok
+ }
+
+ // Store the exchange that we are being granted rights to. This will be used as part of binding
+
+ //Lookup the list of exchanges
+ Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+ if (create_queues_exchanges == null)
+ {
+ create_queues_exchanges = new ConcurrentHashMap();
+ create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges);
+ }
+
+ //if we have an exchange
+ if (exchangeName != null)
+ {
+ //Retrieve the list of permitted exchanges.
+ Map exchanges = (Map) create_queues_exchanges.get(exchangeName);
+
+ if (exchanges == null)
+ {
+ exchanges = new ConcurrentHashMap();
+ create_queues_exchanges.put(exchangeName, exchanges);
+ }
+
+ //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY
+ exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary);
+
+ //Store the binding details of queue/rk for this exchange.
+ if (queueName != null)
+ {
+ //Retrieve the list of permitted routingKeys.
+ Map rKeys = (Map) exchanges.get(exchangeName);
+
+ if (rKeys == null)
+ {
+ rKeys = new ConcurrentHashMap();
+ exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys);
+ }
+
+ rKeys.put(queueName, routingKey);
+ }
+ }
+ }
+
+ private void grantConsume(Permission permission, Object... parameters) {
+ Map consumeRights = (Map) _permissions.get(permission);
+
+ if (consumeRights == null)
+ {
+ consumeRights = new ConcurrentHashMap();
+ _permissions.put(permission, consumeRights);
+ }
+
+ //if we have parametsre
+ if (parameters.length > 0)
+ {
+ AMQShortString queueName = (AMQShortString) parameters[0];
+ Boolean temporary = (Boolean) parameters[1];
+ Boolean ownQueueOnly = (Boolean) parameters[2];
+
+ if (temporary)
+ {
+ consumeRights.put(CONSUME_TEMPORARY_KEY, true);
+ }
+ else
+ {
+ consumeRights.put(CONSUME_TEMPORARY_KEY, false);
+ }
+
+ if (ownQueueOnly)
+ {
+ consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true);
+ }
+ else
+ {
+ consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false);
+ }
+
+
+ LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY);
+ if (queues == null)
+ {
+ queues = new LinkedList();
+ consumeRights.put(CONSUME_QUEUES_KEY, queues);
+ }
+
+ if (queueName != null)
+ {
+ queues.add(queueName);
+ }
+ }
+ }
+
/**
*
* @param permission the type of permission to check
@@ -346,267 +357,286 @@ public class PrincipalPermissions
return AuthzResult.ALLOWED; // This is here for completeness but the SimpleXML ACLManager never calls it.
// The existence of this user specific PP can be validated in the map SimpleXML maintains.
case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey
-
- Exchange exchange = (Exchange) parameters[1];
-
- AMQQueue bind_queueName = (AMQQueue) parameters[2];
- AMQShortString routingKey = (AMQShortString) parameters[3];
-
- //Get all Create Rights for this user
- Map bindCreateRights = (Map) _permissions.get(Permission.CREATEQUEUE);
-
- //Look up the Queue Creation Rights
- Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY);
-
- //Lookup the list of queues
- Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY);
-
- // Check and see if we have a queue white list to check
- if (bind_create_queues_queues != null)
- {
- //There a white list for queues
- Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName);
-
- if (exchangeDetails == null) //Then all queue can be bound to all exchanges.
- {
- return AuthzResult.ALLOWED;
- }
-
- // Check to see if we have a white list of routingkeys to check
- Map rkeys = (Map) exchangeDetails.get(exchange.getName());
-
- // if keys is null then any rkey is allowed on this exchange
- if (rkeys == null)
- {
- // There is no routingkey white list
- return AuthzResult.ALLOWED;
- }
- else
- {
- // We have routingKeys so a match must be found to allowed binding
- Iterator keys = rkeys.keySet().iterator();
-
- boolean matched = false;
- while (keys.hasNext() && !matched)
- {
- AMQShortString rkey = (AMQShortString) keys.next();
- if (rkey.endsWith("*"))
- {
- matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString());
- }
- else
- {
- matched = routingKey.equals(rkey);
- }
- }
-
-
- return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
-
-
- }
- else
- {
- //There a is no white list for queues
-
- // So can allow all queues to be bound
- // but we should first check and see if we have a temp queue and validate that we are allowed
- // to bind temp queues.
-
- //Check to see if we have a temporary queue
- if (bind_queueName.isAutoDelete())
- {
- // Check and see if we have an exchange white list.
- Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
-
- // If the exchange exists then we must check to see if temporary queues are allowed here
- if (bind_exchanges != null)
- {
- // Check to see if the requested exchange is allowed.
- Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName());
-
- return ((Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
-
- //no white list so all allowed, drop through to return true below.
- }
-
- // not a temporary queue and no white list so all allowed.
- return AuthzResult.ALLOWED;
- }
-
+ return authoriseBind(parameters);
case CREATEQUEUE:// Parameters : boolean autodelete, AMQShortString name
-
- Map createRights = (Map) _permissions.get(permission);
-
- // If there are no create rights then deny request
- if (createRights == null)
- {
- return AuthzResult.DENIED;
- }
-
- //Look up the Queue Creation Rights
- Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
-
- //Lookup the list of queues allowed to be created
- Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
-
-
- AMQShortString queueName = (AMQShortString) parameters[1];
- Boolean autoDelete = (Boolean) parameters[0];
-
- if (autoDelete)// we have a temporary queue
- {
- return ((Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- else
- {
- // If there is a white list then check
- if (create_queues_queues == null || create_queues_queues.containsKey(queueName))
- {
- return AuthzResult.ALLOWED;
- }
- else
- {
- return AuthzResult.DENIED;
- }
-
- }
+ return authoriseCreateQueue(permission, parameters);
case CREATEEXCHANGE:
- Map rights = (Map) _permissions.get(permission);
-
- AMQShortString exchangeName = (AMQShortString) parameters[0];
-
- // If the exchange list is doesn't exist then all is allowed else
- // check the valid exchanges
- if (rights == null || rights.containsKey(exchangeName))
- {
- return AuthzResult.ALLOWED;
- }
- else
- {
- return AuthzResult.DENIED;
- }
+ return authoriseCreateExchange(permission, parameters);
case CONSUME: // Parameters : AMQQueue
-
- if (parameters.length == 1 && parameters[0] instanceof AMQQueue)
- {
- AMQQueue queue = ((AMQQueue) parameters[0]);
- Map queuePermissions = (Map) _permissions.get(permission);
-
- List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
-
- Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY);
- Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY);
-
- // If user is allowed to publish to temporary queues and this is a temp queue then allow it.
- if (temporayQueues)
- {
- if (queue.isAutoDelete())
- // This will allow consumption from any temporary queue including ones not owned by this user.
- // Of course the exclusivity will not be broken.
- {
- // if not limited to ownQueuesOnly then ok else check queue Owner.
- return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- else
- {
- return AuthzResult.DENIED;
- }
- }
-
- // if queues are white listed then ensure it is ok
- if (queues != null)
- {
- // if no queues are listed then ALL are ok othereise it must be specified.
- if (ownQueuesOnly)
- {
- if (queue.getOwner().equals(_user))
- {
- return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- else
- {
- return AuthzResult.DENIED;
- }
- }
-
- // If we are
- return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- }
-
- // Can't authenticate without the right parameters
- return AuthzResult.DENIED;
- case DELETE:
- break;
-
+ return authoriseConsume(permission, parameters);
case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
- Map publishRights = (Map) _permissions.get(permission);
-
- if (publishRights == null)
- {
- return AuthzResult.DENIED;
- }
-
- Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
-
- // Having no exchanges listed gives full publish rights to all exchanges
- if (exchanges == null)
- {
- return AuthzResult.ALLOWED;
- }
- // Otherwise exchange must be listed in the white list
-
- // If the map doesn't have the exchange then it isn't allowed
- if (!exchanges.containsKey(((Exchange) parameters[0]).getName()))
- {
- return AuthzResult.DENIED;
- }
- else
- {
-
- // Get valid routing keys
- HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName());
-
- // Having no routingKeys in the map then all are allowed.
- if (routingKeys == null)
- {
- return AuthzResult.ALLOWED;
- }
- else
- {
- // We have routingKeys so a match must be found to allowed binding
- Iterator keys = routingKeys.iterator();
-
-
- AMQShortString publishRKey = (AMQShortString)parameters[1];
-
- boolean matched = false;
- while (keys.hasNext() && !matched)
- {
- AMQShortString rkey = (AMQShortString) keys.next();
-
- if (rkey.endsWith("*"))
- {
- matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1));
- }
- else
- {
- matched = publishRKey.equals(rkey);
- }
- }
- return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
- }
- }
+ return authorisePublish(permission, parameters);
+ /* Fall through */
+ case DELETE:
case PURGE:
- break;
case UNBIND:
- break;
-
+ default:
+ return AuthzResult.DENIED;
}
- return AuthzResult.DENIED;
}
+
+ private AuthzResult authoriseConsume(Permission permission, Object... parameters) {
+ if (parameters.length == 1 && parameters[0] instanceof AMQQueue)
+ {
+ AMQQueue queue = ((AMQQueue) parameters[0]);
+ Map queuePermissions = (Map) _permissions.get(permission);
+
+ if (queuePermissions == null)
+ {
+ //if the outer map is null, the user has no CONSUME rights at all
+ return AuthzResult.DENIED;
+ }
+
+ List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
+
+ Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY);
+ Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY);
+
+ // If user is allowed to publish to temporary queues and this is a temp queue then allow it.
+ if (temporayQueues)
+ {
+ if (queue.isAutoDelete())
+ // This will allow consumption from any temporary queue including ones not owned by this user.
+ // Of course the exclusivity will not be broken.
+ {
+ // if not limited to ownQueuesOnly then ok else check queue Owner.
+ return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ else
+ {
+ return AuthzResult.DENIED;
+ }
+ }
+
+ // if queues are white listed then ensure it is ok
+ if (queues != null)
+ {
+ // if no queues are listed then ALL are ok othereise it must be specified.
+ if (ownQueuesOnly)
+ {
+ if (queue.getOwner().equals(_user))
+ {
+ return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ else
+ {
+ return AuthzResult.DENIED;
+ }
+ }
+
+ // If we are
+ return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ }
+
+ // Can't authenticate without the right parameters
+ return AuthzResult.DENIED;
+ }
+
+ private AuthzResult authorisePublish(Permission permission, Object... parameters) {
+ Map publishRights = (Map) _permissions.get(permission);
+
+ if (publishRights == null)
+ {
+ return AuthzResult.DENIED;
+ }
+
+ Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+ // Having no exchanges listed gives full publish rights to all exchanges
+ if (exchanges == null)
+ {
+ return AuthzResult.ALLOWED;
+ }
+ // Otherwise exchange must be listed in the white list
+
+ // If the map doesn't have the exchange then it isn't allowed
+ if (!exchanges.containsKey(((Exchange) parameters[0]).getName()))
+ {
+ return AuthzResult.DENIED;
+ }
+ else
+ {
+
+ // Get valid routing keys
+ HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName());
+
+ // Having no routingKeys in the map then all are allowed.
+ if (routingKeys == null)
+ {
+ return AuthzResult.ALLOWED;
+ }
+ else
+ {
+ // We have routingKeys so a match must be found to allowed binding
+ Iterator keys = routingKeys.iterator();
+
+
+ AMQShortString publishRKey = (AMQShortString)parameters[1];
+
+ boolean matched = false;
+ while (keys.hasNext() && !matched)
+ {
+ AMQShortString rkey = (AMQShortString) keys.next();
+
+ if (rkey.endsWith("*"))
+ {
+ matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1));
+ }
+ else
+ {
+ matched = publishRKey.equals(rkey);
+ }
+ }
+ return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ }
+ }
+
+ private AuthzResult authoriseCreateExchange(Permission permission, Object... parameters) {
+ Map rights = (Map) _permissions.get(permission);
+
+ AMQShortString exchangeName = (AMQShortString) parameters[0];
+
+ // If the exchange list is doesn't exist then all is allowed else
+ // check the valid exchanges
+ if (rights == null || rights.containsKey(exchangeName))
+ {
+ return AuthzResult.ALLOWED;
+ }
+ else
+ {
+ return AuthzResult.DENIED;
+ }
+ }
+
+ private AuthzResult authoriseCreateQueue(Permission permission, Object... parameters) {
+ Map createRights = (Map) _permissions.get(permission);
+
+ // If there are no create rights then deny request
+ if (createRights == null)
+ {
+ return AuthzResult.DENIED;
+ }
+
+ //Look up the Queue Creation Rights
+ Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+ //Lookup the list of queues allowed to be created
+ Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+
+ AMQShortString queueName = (AMQShortString) parameters[1];
+ Boolean autoDelete = (Boolean) parameters[0];
+
+ if (autoDelete)// we have a temporary queue
+ {
+ return ((Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+ else
+ {
+ // If there is a white list then check
+ if (create_queues_queues == null || create_queues_queues.containsKey(queueName))
+ {
+ return AuthzResult.ALLOWED;
+ }
+ else
+ {
+ return AuthzResult.DENIED;
+ }
+
+ }
+ }
+
+ private AuthzResult authoriseBind(Object... parameters) {
+ Exchange exchange = (Exchange) parameters[1];
+
+ AMQQueue bind_queueName = (AMQQueue) parameters[2];
+ AMQShortString routingKey = (AMQShortString) parameters[3];
+
+ //Get all Create Rights for this user
+ Map bindCreateRights = (Map) _permissions.get(Permission.CREATEQUEUE);
+
+ //Look up the Queue Creation Rights
+ Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY);
+
+ //Lookup the list of queues
+ Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY);
+
+ // Check and see if we have a queue white list to check
+ if (bind_create_queues_queues != null)
+ {
+ //There a white list for queues
+ Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName);
+
+ if (exchangeDetails == null) //Then all queue can be bound to all exchanges.
+ {
+ return AuthzResult.ALLOWED;
+ }
+
+ // Check to see if we have a white list of routingkeys to check
+ Map rkeys = (Map) exchangeDetails.get(exchange.getName());
+
+ // if keys is null then any rkey is allowed on this exchange
+ if (rkeys == null)
+ {
+ // There is no routingkey white list
+ return AuthzResult.ALLOWED;
+ }
+ else
+ {
+ // We have routingKeys so a match must be found to allowed binding
+ Iterator keys = rkeys.keySet().iterator();
+
+ boolean matched = false;
+ while (keys.hasNext() && !matched)
+ {
+ AMQShortString rkey = (AMQShortString) keys.next();
+ if (rkey.endsWith("*"))
+ {
+ matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString());
+ }
+ else
+ {
+ matched = routingKey.equals(rkey);
+ }
+ }
+
+
+ return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+
+
+ }
+ else
+ {
+ //There a is no white list for queues
+
+ // So can allow all queues to be bound
+ // but we should first check and see if we have a temp queue and validate that we are allowed
+ // to bind temp queues.
+
+ //Check to see if we have a temporary queue
+ if (bind_queueName.isAutoDelete())
+ {
+ // Check and see if we have an exchange white list.
+ Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+ // If the exchange exists then we must check to see if temporary queues are allowed here
+ if (bind_exchanges != null)
+ {
+ // Check to see if the requested exchange is allowed.
+ Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName());
+
+ return ((Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ }
+
+ //no white list so all allowed, drop through to return true below.
+ }
+
+ // not a temporary queue and no white list so all allowed.
+ return AuthzResult.ALLOWED;
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
index a6fae053c2..ca6b68dafa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
@@ -35,28 +35,24 @@ public abstract class BasicACLPlugin implements ACLPlugin
// Returns true or false if the plugin should authorise or deny the request
protected abstract AuthzResult getResult();
- @Override
public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch,
AMQQueue queue, AMQShortString routingKey)
{
return getResult();
}
- @Override
public AuthzResult authoriseConnect(AMQProtocolSession session,
VirtualHost virtualHost)
{
return getResult();
}
- @Override
public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck,
AMQQueue queue)
{
return getResult();
}
- @Override
public AuthzResult authoriseConsume(AMQProtocolSession session,
boolean exclusive, boolean noAck, boolean noLocal, boolean nowait,
AMQQueue queue)
@@ -64,7 +60,6 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
public AuthzResult authoriseCreateExchange(AMQProtocolSession session,
boolean autoDelete, boolean durable, AMQShortString exchangeName,
boolean internal, boolean nowait, boolean passive,
@@ -73,7 +68,6 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
public AuthzResult authoriseCreateQueue(AMQProtocolSession session,
boolean autoDelete, boolean durable, boolean exclusive,
boolean nowait, boolean passive, AMQShortString queue)
@@ -81,19 +75,16 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
{
return getResult();
}
- @Override
public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
{
return getResult();
}
- @Override
public AuthzResult authorisePublish(AMQProtocolSession session,
boolean immediate, boolean mandatory, AMQShortString routingKey,
Exchange e)
@@ -101,20 +92,17 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
{
return getResult();
}
- @Override
public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch,
AMQShortString routingKey, AMQQueue queue)
{
return getResult();
}
- @Override
public void setConfiguration(Configuration config)
{
// no-op
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
index 3a81932123..7450322130 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
@@ -211,7 +211,6 @@ public class FirewallPlugin extends AbstractACLPlugin
}
- @Override
public void setConfiguration(Configuration config) throws ConfigurationException
{
// Get default action
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
index 4efe381a8b..8658101cd8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
@@ -42,7 +42,6 @@ public class PropertiesPrincipalDatabaseManager implements PrincipalDatabaseMana
return _databases;
}
- @Override
public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException
{
//todo
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 72d6afc65c..2893e916cc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -630,11 +630,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public QueueEntry getLastSeenEntry()
{
- return _queueContext.get();
+ QueueEntry entry = _queueContext.get();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(_logActor + ": lastSeenEntry: " + (entry == null ? "null" : entry.debugIdentity()));
+ }
+
+ return entry;
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
{
+ _logger.debug(debugIdentity() + " Setting Last Seen To:" + (newvalue == null ? "nullNV" : newvalue.debugIdentity()));
return _queueContext.compareAndSet(expected,newvalue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 3c71282c57..450852cef7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -97,6 +97,7 @@ public class LocalTransactionalContext implements TransactionalContext
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+ _queue.checkCapacity(_channel);
if(entry.immediateAndNotDelivered())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 28af36e3db..10d6021d27 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -91,6 +91,8 @@ public class NonTransactionalContext implements TransactionalContext
public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
QueueEntry entry = queue.enqueue(_storeContext, message);
+ queue.checkCapacity(_channel);
+
//following check implements the functionality
//required by the 'immediate' flag: