summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/grammar/SelectorParser.jj17
-rw-r--r--java/broker/src/main/java/log4j.properties2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java78
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java346
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java223
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java33
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java198
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java98
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java97
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java75
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java151
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java89
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java (renamed from java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java)70
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java41
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java183
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java (renamed from java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java)71
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java108
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java579
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java (renamed from java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java)3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java (renamed from java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java)2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java342
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java130
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java568
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java128
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java7
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java7
70 files changed, 2744 insertions, 1522 deletions
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj
index adec1b348d..f6a843e080 100644
--- a/java/broker/src/main/grammar/SelectorParser.jj
+++ b/java/broker/src/main/grammar/SelectorParser.jj
@@ -172,6 +172,7 @@ TOKEN [IGNORE_CASE] :
TOKEN [IGNORE_CASE] :
{
< ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* >
+ | < QUOTED_ID : "\"" ( ("\"\"") | ~["\""] )* "\"" >
}
// ----------------------------------------------------------------------------
@@ -589,6 +590,7 @@ String stringLitteral() :
PropertyExpression variable() :
{
Token t;
+ StringBuffer rc = new StringBuffer();
PropertyExpression left=null;
}
{
@@ -597,6 +599,21 @@ PropertyExpression variable() :
{
left = new PropertyExpression(t.image);
}
+ |
+ t = <QUOTED_ID>
+ {
+ // Decode the sting value.
+ String image = t.image;
+ for( int i=1; i < image.length()-1; i++ ) {
+ char c = image.charAt(i);
+ if( c == '"' )
+ i++;
+ rc.append(c);
+ }
+ return new PropertyExpression(rc.toString());
+ }
+
+
)
{
return left;
diff --git a/java/broker/src/main/java/log4j.properties b/java/broker/src/main/java/log4j.properties
index 87f04f4991..6788c65463 100644
--- a/java/broker/src/main/java/log4j.properties
+++ b/java/broker/src/main/java/log4j.properties
@@ -19,6 +19,6 @@
log4j.rootCategory=${amqj.logging.level}, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=info
+log4j.appender.console.Threshold=all
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 4696ec4453..17b4fa5d65 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -33,8 +33,9 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.MessageStore;
@@ -42,17 +43,15 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.configuration.Configurator;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
public class AMQChannel
{
@@ -74,7 +73,7 @@ public class AMQChannel
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
*/
- private AtomicLong _deliveryTag = new AtomicLong(0);
+ private long _deliveryTag = 0;
/** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
private AMQQueue _defaultQueue;
@@ -90,7 +89,7 @@ public class AMQChannel
private AMQMessage _currentMessage;
/** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
+ private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -98,8 +97,6 @@ public class AMQChannel
private final AtomicBoolean _suspended = new AtomicBoolean(false);
- private final MessageRouter _exchanges;
-
private TransactionalContext _txnContext, _nonTransactedContext;
/**
@@ -119,11 +116,11 @@ public class AMQChannel
private boolean _closing;
@Configured(path = "advanced.enableJMSXUserID",
- defaultValue = "true")
+ defaultValue = "false")
public boolean ENABLE_JMSXUserID;
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
+ public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
//Set values from configuration
@@ -135,7 +132,7 @@ public class AMQChannel
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
- _exchanges = exchanges;
+
// by default the session is non-transactional
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
@@ -199,11 +196,12 @@ public class AMQChannel
_prefetch_HighWaterMark = prefetchCount;
}
- public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException
{
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
_currentMessage.setPublisher(publisher);
+ _currentMessage.setExchange(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
@@ -215,9 +213,9 @@ public class AMQChannel
}
else
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
+ _log.debug(debugIdentity() + "Content header received on channel " + _channelId);
}
if (ENABLE_JMSXUserID)
@@ -252,9 +250,9 @@ public class AMQChannel
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
}
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "Content body received on channel " + _channelId);
+ _log.debug(debugIdentity() + "Content body received on channel " + _channelId);
}
try
@@ -285,7 +283,7 @@ public class AMQChannel
{
try
{
- _exchanges.routeContent(_currentMessage);
+ _currentMessage.route();
}
catch (NoRouteException e)
{
@@ -295,7 +293,7 @@ public class AMQChannel
public long getNextDeliveryTag()
{
- return _deliveryTag.incrementAndGet();
+ return ++_deliveryTag;
}
public int getNextConsumerTag()
@@ -333,13 +331,32 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
+ // We add before we register as the Async Delivery process may AutoClose the subscriber
+ // so calling _cT2QM.remove before we have done put which was after the register succeeded.
+ // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
_consumerTag2QueueMap.put(tag, queue);
+ try
+ {
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
+ }
+ catch (AMQException e)
+ {
+ _consumerTag2QueueMap.remove(tag);
+ throw e;
+ }
+
return tag;
}
- public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
+ /**
+ * Unsubscribe a consumer from a queue.
+ * @param session
+ * @param consumerTag
+ * @return true if the consumerTag had a mapped queue that could be unregistered.
+ * @throws AMQException
+ */
+ public boolean unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
{
if (_log.isDebugEnabled())
{
@@ -364,7 +381,9 @@ public class AMQChannel
if (q != null)
{
q.unregisterProtocolSession(session, _channelId, consumerTag);
+ return true;
}
+ return false;
}
/**
@@ -822,7 +841,7 @@ public class AMQChannel
{
message.discard(_storeContext);
message.setQueueDeleted(true);
-
+
}
catch (AMQException e)
{
@@ -967,16 +986,19 @@ public class AMQChannel
public void processReturns(AMQProtocolSession session) throws AMQException
{
- for (RequiredDeliveryException bouncedMessage : _returnMessages)
+ if (!_returnMessages.isEmpty())
{
- AMQMessage message = bouncedMessage.getAMQMessage();
- session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ for (RequiredDeliveryException bouncedMessage : _returnMessages)
+ {
+ AMQMessage message = bouncedMessage.getAMQMessage();
+ session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
- message.decrementReference(_storeContext);
- }
+ message.decrementReference(_storeContext);
+ }
- _returnMessages.clear();
+ _returnMessages.clear();
+ }
}
public boolean wouldSuspend(AMQMessage msg)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index ab9f40b31d..d8a8cfb6d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -35,6 +35,7 @@ import org.apache.log4j.xml.DOMConfigurator;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.FixedSizeByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQException;
@@ -275,7 +276,7 @@ public class Main
// once more testing of the performance of the simple allocator has been done
if (!connectorConfig.enablePooledAllocator)
{
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
}
int port = connectorConfig.port;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index ac29998c2a..c62a7880a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -100,10 +100,9 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (UnacknowledgedMessage msg : _unacked)
{
- //msg.restoreTransientMessageData();
-
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
+
}
}
@@ -115,7 +114,6 @@ public class TxAck implements TxnOp
//in memory (persistent changes will be rolled back by store)
for (UnacknowledgedMessage msg : _unacked)
{
- msg.clearTransientMessageData();
msg.getMessage().takeReference();
}
}
@@ -124,11 +122,6 @@ public class TxAck implements TxnOp
{
//remove the unacked messages from the channels map
_map.remove(_unacked);
- for (UnacknowledgedMessage msg : _unacked)
- {
- msg.clearTransientMessageData();
- }
-
}
public void rollback(StoreContext storeContext)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 40f5970cac..df7cecc940 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -68,16 +68,6 @@ public class UnacknowledgedMessage
entry.getMessage().decrementReference(storeContext);
}
- public void restoreTransientMessageData() throws AMQException
- {
- entry.getMessage().restoreTransientMessageData();
- }
-
- public void clearTransientMessageData()
- {
- entry.getMessage().clearTransientMessageData();
- }
-
public AMQMessage getMessage()
{
return entry.getMessage();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index b6b6ee39ce..12347c0278 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -212,10 +212,9 @@ public class DestNameExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + queues);
}
- for (AMQQueue q : queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(queues);
+
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 75be86a387..6fa3686152 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -26,6 +26,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortStringTokenizer;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
@@ -40,11 +41,7 @@ import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -84,12 +81,21 @@ public class DestWildExchange extends AbstractExchange
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
- private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
- // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
- private static final String TOPIC_SEPARATOR = ".";
- private static final String AMQP_STAR = "*";
- private static final String AMQP_HASH = "#";
+
+ private static final byte TOPIC_SEPARATOR = (byte)'.';
+ private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
+ private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
+ private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
+ private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized =
+ new ConcurrentHashMap<AMQShortString, AMQShortString[]>();
+ private static final byte HASH_BYTE = (byte)'#';
+ private static final byte STAR_BYTE = (byte)'*';
/** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
@MBeanDescription("Management Bean for Topic Exchange")
@@ -107,7 +113,7 @@ public class DestWildExchange extends AbstractExchange
public TabularData bindings() throws OpenDataException
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
- for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _routingKey2queues.entrySet())
+ for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet())
{
AMQShortString key = entry.getKey();
List<String> queueList = new ArrayList<String>();
@@ -156,27 +162,75 @@ public class DestWildExchange extends AbstractExchange
assert queue != null;
assert rKey != null;
- AMQShortString routingKey = normalize(rKey);
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
- _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
- List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+ List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
+
+
+
+
+
+
+
// if we got null back, no previous value was associated with the specified routing key hence
// we need to read back the new value just put into the map
if (queueList == null)
{
- queueList = _routingKey2queues.get(routingKey);
+ queueList = _bindingKey2queues.get(rKey);
}
+
+
if (!queueList.contains(queue))
{
queueList.add(queue);
+
+
+ if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ {
+ AMQShortString routingKey = normalize(rKey);
+ List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+
+ if(queueList2 == null)
+ {
+ queueList2 = _wildCardBindingKey2queues.get(routingKey);
+ AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens());
+
+ while (keyTok.hasMoreTokens())
+ {
+ keyTokList.add(keyTok.nextToken());
+ }
+
+ _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()]));
+ }
+ queueList2.add(queue);
+
+ }
+ else
+ {
+ List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
+ if(queueList2 == null)
+ {
+ queueList2 = _simpleBindingKey2queues.get(rKey);
+ }
+ queueList2.add(queue);
+
+ }
+
+
+
+
}
else if (_logger.isDebugEnabled())
{
- _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+ _logger.debug("Queue " + queue + " is already registered with routing key " + rKey);
}
+
+
}
private AMQShortString normalize(AMQShortString routingKey)
@@ -186,60 +240,55 @@ public class DestWildExchange extends AbstractExchange
routingKey = AMQShortString.EMPTY_STRING;
}
- StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
- List<String> _subscription = new ArrayList<String>();
+ AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
while (routingTokens.hasMoreTokens())
{
- _subscription.add(routingTokens.nextToken());
+ subscriptionList.add(routingTokens.nextToken());
}
- int size = _subscription.size();
+ int size = subscriptionList.size();
for (int index = 0; index < size; index++)
{
// if there are more levels
if ((index + 1) < size)
{
- if (_subscription.get(index).equals(AMQP_HASH))
+ if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
{
- if (_subscription.get(index + 1).equals(AMQP_HASH))
+ if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
{
// we don't need #.# delete this one
- _subscription.remove(index);
+ subscriptionList.remove(index);
size--;
// redo this normalisation
index--;
}
- if (_subscription.get(index + 1).equals(AMQP_STAR))
+ if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
{
// we don't want #.* swap to *.#
// remove it and put it in at index + 1
- _subscription.add(index + 1, _subscription.remove(index));
+ subscriptionList.add(index + 1, subscriptionList.remove(index));
}
}
} // if we have more levels
}
- StringBuilder sb = new StringBuilder();
- for (String s : _subscription)
- {
- sb.append(s);
- sb.append(TOPIC_SEPARATOR);
- }
- sb.deleteCharAt(sb.length() - 1);
+ AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
- return new AMQShortString(sb.toString());
+ return normalizedString;
}
public void route(AMQMessage payload) throws AMQException
{
MessagePublishInfo info = payload.getMessagePublishInfo();
- final AMQShortString routingKey = normalize(info.getRoutingKey());
+ final AMQShortString routingKey = info.getRoutingKey();
List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
@@ -254,19 +303,14 @@ public class DestWildExchange extends AbstractExchange
else
{
_logger.warn("No queues found for routing key " + routingKey);
- _logger.warn("Routing map contains: " + _routingKey2queues);
+ _logger.warn("Routing map contains: " + _bindingKey2queues);
return;
}
}
- for (AMQQueue q : queues)
- {
- // TODO: modify code generator to add clone() method then clone the deliver body
- // without this addition we have a race condition - we will be modifying the body
- // before the encoder has encoded the body for delivery
- payload.enqueue(q);
- }
+ payload.enqueue(queues);
+
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
@@ -276,21 +320,21 @@ public class DestWildExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
+ List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
return (queues != null) && queues.contains(queue);
}
public boolean isBound(AMQShortString routingKey)
{
- List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
+ List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
return (queues != null) && !queues.isEmpty();
}
public boolean isBound(AMQQueue queue)
{
- for (List<AMQQueue> queues : _routingKey2queues.values())
+ for (List<AMQQueue> queues : _bindingKey2queues.values())
{
if (queues.contains(queue))
{
@@ -303,7 +347,7 @@ public class DestWildExchange extends AbstractExchange
public boolean hasBindings()
{
- return !_routingKey2queues.isEmpty();
+ return !_bindingKey2queues.isEmpty();
}
public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
@@ -311,13 +355,11 @@ public class DestWildExchange extends AbstractExchange
assert queue != null;
assert rKey != null;
- AMQShortString routingKey = normalize(rKey);
-
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ List<AMQQueue> queues = _bindingKey2queues.get(rKey);
if (queues == null)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + routingKey + ". No queue was registered with that _routing key");
+ + " with routing key " + rKey + ". No queue was registered with that _routing key");
}
@@ -325,12 +367,39 @@ public class DestWildExchange extends AbstractExchange
if (!removedQ)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + routingKey);
+ + " with routing key " + rKey);
}
+
+ if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ {
+ AMQShortString bindingKey = normalize(rKey);
+ List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey);
+ queues2.remove(queue);
+ if(queues2.isEmpty())
+ {
+ _wildCardBindingKey2queues.remove(bindingKey);
+ _bindingKey2Tokenized.remove(bindingKey);
+ }
+
+ }
+ else
+ {
+ List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey);
+ queues2.remove(queue);
+ if(queues2.isEmpty())
+ {
+ _simpleBindingKey2queues.remove(rKey);
+ }
+
+ }
+
+
+
+
if (queues.isEmpty())
{
- _routingKey2queues.remove(routingKey);
+ _bindingKey2queues.remove(rKey);
}
}
@@ -349,117 +418,162 @@ public class DestWildExchange extends AbstractExchange
public Map<AMQShortString, List<AMQQueue>> getBindings()
{
- return _routingKey2queues;
+ return _bindingKey2queues;
}
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
- List<AMQQueue> list = new LinkedList<AMQQueue>();
- StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
- ArrayList<String> routingkeyList = new ArrayList<String>();
+ List<AMQQueue> list = null;
- while (routingTokens.hasMoreTokens())
+ if(!_wildCardBindingKey2queues.isEmpty())
{
- String next = routingTokens.nextToken();
- if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH))
- {
- continue;
- }
- routingkeyList.add(next);
- }
- for (AMQShortString queue : _routingKey2queues.keySet())
- {
- StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR);
+ AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ final int routingTokensCount = routingTokens.countTokens();
+
- ArrayList<String> queueList = new ArrayList<String>();
+ AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount];
- while (queTok.hasMoreTokens())
+ if(routingTokensCount == 1)
{
- queueList.add(queTok.nextToken());
+ routingkeyTokens[0] =routingKey;
}
+ else
+ {
- int depth = 0;
- boolean matching = true;
- boolean done = false;
- int routingskip = 0;
- int queueskip = 0;
- while (matching && !done)
- {
- if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
+ int token = 0;
+ while (routingTokens.hasMoreTokens())
{
- done = true;
- // if it was the routing key that ran out of digits
- if (routingkeyList.size() == (depth + routingskip))
- {
- if (queueList.size() > (depth + queueskip))
- { // a hash and it is the last entry
- matching =
- queueList.get(depth + queueskip).equals(AMQP_HASH)
- && (queueList.size() == (depth + queueskip + 1));
- }
- }
- else if (routingkeyList.size() > (depth + routingskip))
- {
- // There is still more routing key to check
- matching = false;
- }
+ AMQShortString next = routingTokens.nextToken();
- continue;
+ routingkeyTokens[token++] = next;
}
+ }
+ for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet())
+ {
+
+ AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey);
+
+
+ boolean matching = true;
+ boolean done = false;
- // if the values on the two topics don't match
- if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip)))
+ int depthPlusRoutingSkip = 0;
+ int depthPlusQueueSkip = 0;
+
+ final int bindingKeyTokensCount = bindingKeyTokens.length;
+
+ while (matching && !done)
{
- if (queueList.get(depth + queueskip).equals(AMQP_STAR))
+
+ if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip))
{
- depth++;
+ done = true;
+
+ // if it was the routing key that ran out of digits
+ if (routingTokensCount == depthPlusRoutingSkip)
+ {
+ if (bindingKeyTokensCount > depthPlusQueueSkip)
+ { // a hash and it is the last entry
+ matching =
+ bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)
+ && (bindingKeyTokensCount == (depthPlusQueueSkip + 1));
+ }
+ }
+ else if (routingTokensCount > depthPlusRoutingSkip)
+ {
+ // There is still more routing key to check
+ matching = false;
+ }
continue;
}
- else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
+
+ // if the values on the two topics don't match
+ if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip]))
{
- // Is this a # at the end
- if (queueList.size() == (depth + queueskip + 1))
+ if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN))
{
- done = true;
+ depthPlusQueueSkip++;
+ depthPlusRoutingSkip++;
continue;
}
-
- // otherwise # in the middle
- while (routingkeyList.size() > (depth + routingskip))
+ else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN))
{
- if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
+ // Is this a # at the end
+ if (bindingKeyTokensCount == (depthPlusQueueSkip + 1))
+ {
+ done = true;
+
+ continue;
+ }
+
+ // otherwise # in the middle
+ while (routingTokensCount > depthPlusRoutingSkip)
{
- queueskip++;
- depth++;
+ if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1]))
+ {
+ depthPlusQueueSkip += 2;
+ depthPlusRoutingSkip++;
+
+ break;
+ }
- break;
+ depthPlusRoutingSkip++;
}
- routingskip++;
+ continue;
}
- continue;
+ matching = false;
}
- matching = false;
+ depthPlusQueueSkip++;
+ depthPlusRoutingSkip++;
}
- depth++;
+ if (matching)
+ {
+ if(list == null)
+ {
+ list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey));
+ }
+ else
+ {
+ list.addAll(_wildCardBindingKey2queues.get(bindingKey));
+ }
+ }
}
- if (matching)
+ }
+ if(!_simpleBindingKey2queues.isEmpty())
+ {
+ List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey);
+ if(list == null)
+ {
+ if(queues == null)
+ {
+ list = Collections.EMPTY_LIST;
+ }
+ else
+ {
+ list = new ArrayList<AMQQueue>(queues);
+ }
+ }
+ else if(queues != null)
{
- list.addAll(_routingKey2queues.get(queue));
+ list.addAll(queues);
}
+
}
return list;
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 57ae2bb6d4..e7c887f306 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -42,6 +42,7 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -205,10 +206,8 @@ public class FanoutExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + _queues);
}
- for (AMQQueue q : _queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(new ArrayList(_queues));
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 2061803d65..32f58ed666 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -56,7 +56,7 @@ public class JMSSelectorFilter implements MessageFilter
catch (AMQException e)
{
//fixme this needs to be sorted.. it shouldn't happen
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
return false;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index bb48539f50..7cd4afdb77 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,18 +24,18 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
{
- private static final Logger _log = Logger.getLogger(BasicConsumeMethodHandler.class);
+ private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler();
@@ -65,21 +65,21 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
- if (_log.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- _log.debug("BasicConsume: from '" + body.getQueue() +
- "' for:" + body.getConsumerTag() +
- " nowait:" + body.getNowait() +
- " args:" + body.getArguments());
+ _logger.debug("BasicConsume: from '" + body.getQueue() +
+ "' for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait() +
+ " args:" + body.getArguments());
}
AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern());
if (queue == null)
{
- if (_log.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _log.trace("No queue for '" + body.getQueue() + "'");
+ _logger.debug("No queue for '" + body.getQueue() + "'");
}
if (body.getQueue() != null)
{
@@ -97,6 +97,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
final AMQShortString consumerTagName;
+ //Perform ACLs
+ vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
+
if (body.getConsumerTag() != null)
{
consumerTagName = body.getConsumerTag().intern();
@@ -123,7 +126,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
catch (org.apache.qpid.AMQInvalidArgumentException ise)
{
- _log.debug("Closing connection due to invalid selector");
+ _logger.debug("Closing connection due to invalid selector");
MethodRegistry methodRegistry = session.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 8d69697350..f8f9127809 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -27,10 +27,10 @@ import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -82,7 +82,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
- if(!queue.performGet(session, channel, !body.getNoAck()))
+
+ //Perform ACLs
+ vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
+
+ if (!queue.performGet(session, channel, !body.getNoAck()))
{
MethodRegistry methodRegistry = session.getMethodRegistry();
// TODO - set clusterId
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 66afc61751..0f99a21ee5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -25,20 +25,19 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
+public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
{
- private static final Logger _log = Logger.getLogger(BasicPublishMethodHandler.class);
+ private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class);
private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
@@ -55,12 +54,9 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
-
-
- if (_log.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- _log.debug("Publish received on channel " + channelId);
+ _logger.debug("Publish received on channel " + channelId);
}
AMQShortString exchange = body.getExchange();
@@ -90,8 +86,12 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
throw body.getChannelNotFoundException(channelId);
}
+ //Access Control
+ vHost.getAccessManager().authorise(session, Permission.PUBLISH, body, e);
+
MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
- channel.setPublishFrame(info, session);
+ info.setExchange(exchange);
+ channel.setPublishFrame(info, session, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index c48dd902eb..069cc6ea2c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -100,9 +100,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
}
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 3f604480b9..054674aed4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -29,7 +29,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -55,8 +54,8 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(),
- virtualHost.getExchangeRegistry());
+ final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
+ );
session.addChannel(channel);
ChannelOpenOkBody response;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index 4f62e0b930..f99e650979 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -23,14 +23,12 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.security.access.AccessResult;
-import org.apache.qpid.server.security.access.AccessRights;
import org.apache.log4j.Logger;
public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
@@ -79,22 +77,8 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
{
session.setVirtualHost(virtualHost);
- AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID(), AccessRights.Rights.ANY);
-
- switch (result.getStatus())
- {
- default:
- case REFUSED:
- String error = "Any access denied to vHost '" + virtualHostName + "' by "
- + result.getAuthorizer();
-
- _logger.warn(error);
-
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error);
- case GRANTED:
- _logger.info("Granted any access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID()
- + " by '" + result.getAuthorizer() + "'");
- }
+ //Perform ACL
+ virtualHost.getAccessManager().authorise(session, Permission.ACCESS ,body, virtualHost);
// See Spec (0.8.2). Section 3.1.2 Virtual Hosts
if (session.getContextKey() == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index c4a57f0286..9a98bc9659 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -26,11 +26,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -58,7 +58,12 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
-
+
+ if (!body.getPassive())
+ {
+ //Perform ACL if request is not passive
+ virtualHost.getAccessManager().authorise(session, Permission.CREATE, body);
+ }
if (_logger.isDebugEnabled())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 8f36e6c767..888ffcb2e5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -21,13 +21,12 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -51,6 +50,9 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ //Perform ACLs
+ virtualHost.getAccessManager().authorise(session, Permission.DELETE,body,
+ exchangeRegistry.getExchange(body.getExchange()));
try
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index a365cd864a..0f6dc7a19d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -25,13 +25,13 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -106,6 +106,10 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
try
{
+
+ //Perform ACLs
+ virtualHost.getAccessManager().authorise(session, Permission.BIND, body, exch, queue, routingKey);
+
if (!exch.isBound(routingKey, body.getArguments(), queue))
{
queue.bind(routingKey, body.getArguments(), exch);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 067c6ac285..7df864f189 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,17 +20,15 @@
*/
package org.apache.qpid.server.handler;
-import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.exchange.ExchangeDefaults;
+
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
@@ -38,6 +36,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
@@ -47,7 +46,7 @@ import org.apache.commons.configuration.Configuration;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
- private static final Logger _log = Logger.getLogger(QueueDeclareHandler.class);
+ private static final Logger _logger = Logger.getLogger(QueueDeclareHandler.class);
private static final QueueDeclareHandler _instance = new QueueDeclareHandler();
@@ -56,7 +55,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
return _instance;
}
- @Configured(path = "queue.auto_register", defaultValue = "false")
+ @Configured(path = "queue.auto_register", defaultValue = "true")
public boolean autoRegister;
private final AtomicInteger _counter = new AtomicInteger();
@@ -76,6 +75,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
MessageStore store = virtualHost.getMessageStore();
+ if (!body.getPassive())
+ {
+ //Perform ACL if request is not passive
+ virtualHost.getAccessManager().authorise(session, Permission.CREATE, body);
+ }
final AMQShortString queueName;
@@ -109,7 +113,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else
{
- queue = createQueue(queueName,body, virtualHost, session);
+ queue = createQueue(queueName, body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
store.createQueue(queue);
@@ -120,7 +124,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
queue.bind(queueName, null, defaultExchange);
- _log.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")");
+ _logger.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")");
}
}
}
@@ -152,7 +156,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queue.getConsumerCount());
session.writeFrame(responseBody.generateFrame(channelId));
- _log.info("Queue " + queueName + " declared successfully");
+ _logger.info("Queue " + queueName + " declared successfully");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 09b7de3a5c..310a73ffeb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.security.access.Permission;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -103,6 +104,10 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
}
else
{
+
+ //Perform ACLs
+ virtualHost.getAccessManager().authorise(session, Permission.DELETE, body, queue);
+
int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
if (queue.isDurable())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 5bdca93bc6..cce49f13c7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.security.access.Permission;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -100,6 +101,10 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
+
+ //Perform ACLs
+ virtualHost.getAccessManager().authorise(session, Permission.PURGE, body, queue);
+
long purged = queue.clearQueue(channel.getStoreContext());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
index b056fa6797..e758e315aa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
@@ -1,110 +1,113 @@
-package org.apache.qpid.server.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.protocol.AMQConstant;
-
-public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
-{
- private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
-
- private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
-
- public static QueueUnbindHandler getInstance()
- {
- return _instance;
- }
-
- private QueueUnbindHandler()
- {
- }
-
- public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
-
- final AMQQueue queue;
- final AMQShortString routingKey;
-
- if (body.getQueue() == null)
- {
- AMQChannel channel = session.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
-
- queue = channel.getDefaultQueue();
-
- if (queue == null)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
- }
-
- routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
-
- }
- else
- {
- queue = queueRegistry.getQueue(body.getQueue());
- routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
- }
-
- if (queue == null)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
- }
- final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
- if (exch == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
- }
-
-
- try
- {
- queue.unBind(routingKey, body.getArguments(), exch);
- }
- catch (AMQInvalidRoutingKeyException rke)
- {
- throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
- }
- catch (AMQException e)
- {
- if(e.getErrorCode() == AMQConstant.NOT_FOUND)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
- }
- throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
- }
-
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
-
-
- }
-}
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+ private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+ public static QueueUnbindHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueUnbindHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ if (body.getQueue() == null)
+ {
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+ }
+
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.getQueue());
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ }
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ }
+
+ //Perform ACLs
+ virtualHost.getAccessManager().authorise(session, Permission.UNBIND, body, queue);
+
+ try
+ {
+ queue.unBind(routingKey, body.getArguments(), exch);
+ }
+ catch (AMQInvalidRoutingKeyException rke)
+ {
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
+ }
+ catch (AMQException e)
+ {
+ if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
+ }
+ throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
+ }
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
index 4fb260472d..a0ecc2bd85 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.management;
-import org.apache.qpid.server.security.access.UserManagement;
+import org.apache.qpid.server.security.access.management.UserManagement;
import org.apache.log4j.Logger;
import javax.management.remote.MBeanServerForwarder;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 1bfe1e3d35..d7a879180a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -72,7 +72,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -100,8 +100,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -127,7 +127,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final StoreContext storeContext = message.getStoreContext();
final long messageId = message.getMessageId();
- ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -151,8 +151,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -171,7 +171,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -187,10 +187,10 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame.toByteBuffer();
+ return deliverFrame;
}
- private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -205,7 +205,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame.toByteBuffer();
+ return getOkFrame;
}
public byte getProtocolMinorVersion()
@@ -218,7 +218,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return getProtocolSession().getProtocolMajorVersion();
}
- private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicReturnBody basicReturnBody =
@@ -228,13 +228,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame.toByteBuffer();
+ return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -247,14 +247,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 0bc2fcf6f7..646ef43826 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -12,10 +12,14 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -46,9 +50,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
@@ -58,8 +62,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
if(bodyCount == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
writeFrame(compositeBlock);
}
@@ -73,9 +77,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+
+ CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
//
@@ -84,7 +88,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext,messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -93,6 +97,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
@@ -101,11 +113,10 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final StoreContext storeContext = message.getStoreContext();
final long messageId = message.getMessageId();
- ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
if(bodyCount == 0)
@@ -124,9 +135,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -135,7 +146,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext, messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -145,41 +156,84 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- BasicDeliverBody deliverBody =
- methodRegistry.createBasicDeliverBody(consumerTag,
- deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+
+ final boolean isRedelivered = messageHandle.isRedelivered();
+ final AMQShortString exchangeName = pb.getExchange();
+ final AMQShortString routingKey = pb.getRoutingKey();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
- return deliverFrame.toByteBuffer();
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
}
- private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
BasicGetOkBody getOkBody =
- methodRegistry.createBasicGetOkBody(deliveryTag,
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
messageHandle.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame.toByteBuffer();
+ return getOkFrame;
}
public byte getProtocolMinorVersion()
@@ -192,26 +246,25 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return getProtocolSession().getProtocolMajorVersion();
}
- private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
BasicReturnBody basicReturnBody =
- methodRegistry.createBasicReturnBody(replyCode,
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame.toByteBuffer();
+ return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
//
@@ -221,14 +274,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
@@ -252,9 +304,69 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
index be22a90d0b..9191ecf6ed 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
@@ -21,7 +21,6 @@ package org.apache.qpid.server.plugins;
import java.io.File;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,7 +29,6 @@ import org.apache.felix.framework.Felix;
import org.apache.felix.framework.cache.BundleCache;
import org.apache.felix.framework.util.FelixConstants;
import org.apache.felix.framework.util.StringMap;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeType;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleException;
@@ -72,7 +70,7 @@ public class PluginManager
"org.apache.qpid.server.queue; version=0.2.1," +
"javax.management.openmbean; version=1.0.0,"+
"javax.management; version=1.0.0,"+
- "uk.co.thebadgerset.junit.extensions.util; version=0.6.1,"
+ "org.apache.qpid.junit.extensions.util; version=0.6.1,"
);
if (plugindir == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 0fe6d3636e..4267642b14 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -109,7 +109,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private List<Integer> _closingChannelsList = new ArrayList<Integer>();
+ private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
@@ -138,11 +138,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
catch (RuntimeException e)
{
e.printStackTrace();
- // throw e;
+ throw e;
}
-
- // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
@@ -209,26 +207,39 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_logger.debug("Frame Received: " + frame);
}
- if (body instanceof AMQMethodBody)
- {
- methodFrameReceived(channelId, (AMQMethodBody) body);
- }
- else if (body instanceof ContentHeaderBody)
- {
- contentHeaderReceived(channelId, (ContentHeaderBody) body);
- }
- else if (body instanceof ContentBody)
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
{
- contentBodyReceived(channelId, (ContentBody) body);
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+ }
+
+ return;
+ }
}
- else if (body instanceof HeartbeatBody)
+
+
+
+ try
{
- // NO OP
+ body.handle(channelId, this);
}
- else
+ catch (AMQException e)
{
- _logger.warn("Unrecognised frame " + frame.getClass().getName());
+ closeChannel(channelId);
+ throw e;
}
+
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -271,32 +282,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
- private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+ public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((evt.getMethod() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
- }
-
- return;
- }
- }
-
try
{
try
@@ -358,6 +348,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_logger.info("Closing connection due to: " + e.getMessage());
}
+ markChannelAwaitingCloseOk(channelId);
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));
@@ -365,17 +356,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
catch (Exception e)
{
- _stateManager.error(e);
+
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
}
+ _logger.error("Unexpected exception while processing frame. Closing connection.", e);
+
_minaProtocolSession.close();
}
}
- private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
@@ -384,13 +377,18 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
- private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentBody(body, this);
}
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+ {
+ // NO - OP
+ }
+
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -539,7 +537,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
try
{
channel.close(this);
- markChannelawaitingCloseOk(channelId);
+ markChannelAwaitingCloseOk(channelId);
}
finally
{
@@ -550,11 +548,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void closeChannelOk(int channelId)
{
- removeChannel(channelId);
+ // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler.
+ // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory.
+ // We do it from the Close Handler as we are sending the OK back to the client.
+ // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
+ // will send a close-ok.. Where we should call removeChannel.
+ // However, due to the poor exception handling on the client. The client-user will be notified of the
+ // InvalidArgument and if they then decide to close the session/connection then the there will be time
+ // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed.
+ //removeChannel(channelId);
_closingChannelsList.remove(new Integer(channelId));
}
- private void markChannelawaitingCloseOk(int channelId)
+ private void markChannelAwaitingCloseOk(int channelId)
{
_closingChannelsList.add(channelId);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 543e043bed..d8dbf97e49 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -29,9 +29,8 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
@@ -57,6 +56,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
private final IApplicationRegistry _applicationRegistry;
+ private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144";
+ private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144";
+
+ private final int BUFFER_READ_LIMIT_SIZE;
+ private final int BUFFER_WRITE_LIMIT_SIZE;
public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
{
@@ -66,6 +70,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
{
_applicationRegistry = applicationRegistry;
+
+ // Read the configuration from the application registry
+ BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE));
+ BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE));
+
_logger.debug("AMQPFastProtocolHandler created");
}
@@ -82,7 +91,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
createSession(protocolSession, _applicationRegistry, codecFactory);
_logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
+ final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
getConfiguredObject(ConnectorConfiguration.class);
@@ -114,27 +123,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
- if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false))
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false))
{
try
{
// //Add IO Protection Filters
IoFilterChain chain = protocolSession.getFilterChain();
- int buf_size = 32768;
- if (protocolSession.getConfig() instanceof SocketSessionConfig)
- {
- buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize();
- }
protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
writefilter.attach(chain);
protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d9a9d2273b..f501bc27d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.exchange.Exchange;
import java.util.HashMap;
import java.util.HashSet;
@@ -82,12 +83,16 @@ public class AMQMessage
private long _expiration;
- private final int hashcode = System.identityHashCode(this);
+
+
+ private Exchange _exchange;
+ private static final boolean SYNCED_CLOCKS =
+ ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false);
public String debugIdentity()
{
- return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
+ return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
public void setExpiration()
@@ -97,7 +102,7 @@ public class AMQMessage
long timestamp =
((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
- if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+ if (SYNCED_CLOCKS)
{
_expiration = expiration;
}
@@ -126,6 +131,21 @@ public class AMQMessage
return _referenceCount.get() > 0;
}
+ public void setExchange(final Exchange exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public void route() throws AMQException
+ {
+ _exchange.route(this);
+ }
+
+ public void enqueue(final List<AMQQueue> queues)
+ {
+ _transientMessageData.setDestinationQueues(queues);
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -637,8 +657,6 @@ public class AMQMessage
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
- _transientMessageData = null;
-
for (AMQQueue q : destinationQueues)
{
// Increment the references to this message for each queue delivery.
@@ -649,7 +667,7 @@ public class AMQMessage
}
finally
{
- destinationQueues.clear();
+
// Remove refence for routing process . Reference count should now == delivered queue count
decrementReference(storeContext);
}
@@ -687,10 +705,6 @@ public class AMQMessage
_transientMessageData = transientMessageData;
}
- public void clearTransientMessageData()
- {
- _transientMessageData = null;
- }
public String toString()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 53c36d9718..7c6db0b4b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -36,7 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,10 +155,9 @@ public class AMQQueue implements Managable, Comparable
/** total messages received by the queue since startup. */
public AtomicLong _totalMessagesReceived = new AtomicLong();
- public int compareTo(Object o)
- {
- return _name.compareTo(((AMQQueue) o).getName());
- }
+
+ private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -200,6 +200,13 @@ public class AMQQueue implements Managable, Comparable
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+
+ // This ensure that the notification checks for the configured alerts are created.
+ setMaximumMessageAge(_maximumMessageAge);
+ setMaximumMessageCount(_maximumMessageCount);
+ setMaximumMessageSize(_maximumMessageSize);
+ setMaximumQueueDepth(_maximumQueueDepth);
+
}
private AMQQueueMBean createMBean() throws AMQException
@@ -214,7 +221,7 @@ public class AMQQueue implements Managable, Comparable
}
}
- public AMQShortString getName()
+ public final AMQShortString getName()
{
return _name;
}
@@ -540,9 +547,17 @@ public class AMQQueue implements Managable, Comparable
return _maximumMessageSize;
}
- public void setMaximumMessageSize(long value)
+ public void setMaximumMessageSize(final long maximumMessageSize)
{
- _maximumMessageSize = value;
+ _maximumMessageSize = maximumMessageSize;
+ if(maximumMessageSize == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
}
public int getConsumerCount()
@@ -565,9 +580,20 @@ public class AMQQueue implements Managable, Comparable
return _maximumMessageCount;
}
- public void setMaximumMessageCount(long value)
+ public void setMaximumMessageCount(final long maximumMessageCount)
{
- _maximumMessageCount = value;
+ _maximumMessageCount = maximumMessageCount;
+ if(maximumMessageCount == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+
+
+
}
public long getMaximumQueueDepth()
@@ -576,9 +602,18 @@ public class AMQQueue implements Managable, Comparable
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(long value)
+ public void setMaximumQueueDepth(final long maximumQueueDepth)
{
- _maximumQueueDepth = value;
+ _maximumQueueDepth = maximumQueueDepth;
+ if(maximumQueueDepth == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+
}
public long getOldestMessageArrivalTime()
@@ -661,6 +696,12 @@ public class AMQQueue implements Managable, Comparable
}
_subscribers.addSubscriber(subscription);
+ if(exclusive)
+ {
+ _subscribers.setExclusive(true);
+ }
+
+ subscription.start();
}
private boolean isExclusive()
@@ -692,6 +733,7 @@ public class AMQQueue implements Managable, Comparable
ps, channel, consumerTag, this));
}
+ _subscribers.setExclusive(false);
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
consumerTag)))
@@ -805,7 +847,7 @@ public class AMQQueue implements Managable, Comparable
public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
{
AMQMessage msg = entry.getMessage();
- _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
+ _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -938,6 +980,14 @@ public class AMQQueue implements Managable, Comparable
public void setMaximumMessageAge(long maximumMessageAge)
{
_maximumMessageAge = maximumMessageAge;
+ if(maximumMessageAge == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
}
public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
@@ -950,4 +1000,25 @@ public class AMQQueue implements Managable, Comparable
return new QueueEntry(this, amqMessage);
}
+ public int compareTo(Object o)
+ {
+ return _name.compareTo(((AMQQueue) o).getName());
+ }
+
+
+ public void removeExpiredIfNoSubscribers() throws AMQException
+ {
+ synchronized(_subscribers.getChangeLock())
+ {
+ if(_subscribers.isEmpty())
+ {
+ _deliveryMgr.removeExpired();
+ }
+ }
+ }
+
+ public final Set<NotificationCheck> getNotificationChecks()
+ {
+ return _notificationChecks;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 9e32de3f76..348a136f9d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -54,10 +54,7 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
@@ -97,6 +94,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
private Notification _lastNotification = null;
+
+
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -249,16 +249,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
public void checkForNotification(AMQMessage msg) throws AMQException, JMException
{
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+ final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
- for (NotificationCheck check : NotificationCheck.values())
+ if(!notificationChecks.isEmpty())
{
- if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+
+ for (NotificationCheck check : notificationChecks)
{
- if (check.notifyIfNecessary(msg, _queue, this))
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
- _lastNotificationTimes[check.ordinal()] = currentTime;
+ if (check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 3cf2cb9b12..7dfcae95c3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -212,6 +212,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ /**
+ * NOTE : This method should only be called when there are no active subscribers
+ */
+ public void removeExpired() throws AMQException
+ {
+ _lock.lock();
+
+
+ for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+ {
+ QueueEntry entry = iter.next();
+ if(entry.expired())
+ {
+ // fixme: Currently we have to update the total byte size here for the data in the queue
+ _totalMessageSize.addAndGet(-entry.getSize());
+ _queue.dequeue(_reapingStoreContext,entry);
+ iter.remove();
+ }
+ }
+
+
+ _lock.unlock();
+ }
+
/** @return the state of the async processor. */
public boolean isProcessingAsync()
{
@@ -278,9 +302,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void populatePreDeliveryQueue(Subscription subscription)
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
+ _log.debug("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
}
Iterator<QueueEntry> currentQueue = _messages.iterator();
@@ -289,13 +313,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
QueueEntry entry = currentQueue.next();
- if (!entry.getDeliveredToConsumer())
+ if (subscription.hasInterest(entry))
{
- if (subscription.hasInterest(entry))
- {
- subscription.enqueueForPreDelivery(entry, false);
- }
+ subscription.enqueueForPreDelivery(entry, false);
}
+
}
}
@@ -339,8 +361,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
deliveryTag, _queue.getMessageCount());
- _totalMessageSize.addAndGet(-entry.getSize());
+
}
+ _totalMessageSize.addAndGet(-entry.getSize());
if (!acks)
{
@@ -484,9 +507,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (purgeMessage(entry, sub, purgeOnly))
{
AMQMessage message = entry.getMessage();
- // if we are purging then ensure we mark this message taken for the current subscriber
- // the current subscriber may be null in the case of a get or a purge but this is ok.
-// boolean alreadyTaken = message.taken(_queue, sub);
//remove the already taken message or expired
QueueEntry removed = messages.poll();
@@ -494,7 +514,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
assert removed == entry;
// if the message expired then the _totalMessageSize needs adjusting
- if (message.expired(_queue) && !entry.getDeliveredToConsumer())
+ if (message.expired(_queue) && !entry.taken(sub))
{
_totalMessageSize.addAndGet(-entry.getSize());
@@ -512,9 +532,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//else the clean up is not required as the message has already been taken for this queue therefore
// it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace("Removed taken message:" + message.debugIdentity());
+ _log.debug("Removed taken message:" + message.debugIdentity());
}
// try the next message
@@ -610,9 +630,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
") from queue (" + System.identityHashCode(messageQueue) +
") AMQQueue (" + System.identityHashCode(queue) + ")");
}
@@ -638,9 +658,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// message will be null if we have no messages in the messageQueue.
if (entry == null)
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ _log.debug(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
}
return;
}
@@ -680,9 +700,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (messageQueue == sub.getResendQueue())
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub);
+ _log.debug(debugIdentity() + "All messages sent from resendQueue for " + sub);
}
if (messageQueue.isEmpty())
{
@@ -842,17 +862,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
for (Subscription sub : _subscriptions.getSubscriptions())
{
- // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
- if (_queue.isShared() && entry.getDeliveredToConsumer())
- {
- if (debugEnabled)
- {
- _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) +
- ") is already delivered.");
- }
- continue;
- }
-
// Only give the message to those that want them.
if (sub.hasInterest(entry))
{
@@ -894,9 +903,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!s.isSuspended())
{
- if (_log.isTraceEnabled())
+ if (debugEnabled)
{
- _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
+ _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index f7f35a9319..1568f58e2e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -97,4 +97,6 @@ interface DeliveryManager
long getOldestMessageArrival();
void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg);
+
+ void removeExpired() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
index 60c1a8f574..e6377b33da 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
@@ -51,7 +51,7 @@ class ExchangeBindings
ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
{
- _routingKey = routingKey;
+ _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
_exchange = exchange;
_arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
}
@@ -74,8 +74,7 @@ class ExchangeBindings
public int hashCode()
{
return (_exchange == null ? 0 : _exchange.hashCode())
- + (_routingKey == null ? 0 : _routingKey.hashCode())
- + (_arguments == null ? 0 : _arguments.hashCode());
+ + (_routingKey == null ? 0 : _routingKey.hashCode());
}
public boolean equals(Object o)
@@ -86,8 +85,7 @@ class ExchangeBindings
}
ExchangeBinding eb = (ExchangeBinding) o;
return _exchange.equals(eb._exchange)
- && _routingKey.equals(eb._routingKey)
- && _arguments.equals(eb._arguments);
+ && _routingKey.equals(eb._routingKey);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index 6b3d65661f..6f9efd3200 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -29,9 +29,9 @@ public enum NotificationCheck
{
boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
- int msgCount = queue.getMessageCount();
+ int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
return true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index a706098b71..96ce6743ec 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -45,8 +45,6 @@ public interface Subscription
void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
- boolean isAutoClose();
-
void close();
boolean isClosed();
@@ -60,4 +58,6 @@ public interface Subscription
Object getSendLock();
AMQChannel getChannel();
+
+ void start();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index e631481cc8..bde3ad8ec9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -292,14 +292,17 @@ public class SubscriptionImpl implements Subscription
queue.dequeue(storeContext, entry);
}
+/*
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
+ }
+*/
+
synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
- if (_sendLock.get())
- {
- _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
- }
if (_acks)
{
@@ -308,10 +311,11 @@ public class SubscriptionImpl implements Subscription
protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
- if (!_acks)
- {
- entry.getMessage().decrementReference(storeContext);
- }
+
+ }
+ if (!_acks)
+ {
+ entry.getMessage().decrementReference(storeContext);
}
}
finally
@@ -367,59 +371,60 @@ public class SubscriptionImpl implements Subscription
// return false;
}
- final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+
//todo - client id should be recoreded and this test removed but handled below
- if (_noLocal && publisher != null)
+ if (_noLocal)
{
- // We don't want local messages so check to see if message is one we sent
- Object localInstance;
- Object msgInstance;
- if ((protocolSession.getClientProperties() != null) &&
- (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+ if(publisher != null)
+
{
+ // We don't want local messages so check to see if message is one we sent
+ Object localInstance;
+ Object msgInstance;
- if ((publisher.getClientProperties() != null) &&
- (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ if ((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (localInstance == msgInstance || localInstance.equals(msgInstance))
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-// msg.debugIdentity() + ")");
-// }
- return false;
+ if (localInstance == msgInstance || localInstance.equals(msgInstance))
+ {
+ // if (_logger.isTraceEnabled())
+ // {
+ // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ // msg.debugIdentity() + ")");
+ // }
+ return false;
+ }
}
}
- }
- else
- {
+ else
+ {
- localInstance = protocolSession.getClientIdentifier();
- //todo - client id should be recoreded and this test removed but handled here
+ localInstance = protocolSession.getClientIdentifier();
+ //todo - client id should be recoreded and this test removed but handled here
- msgInstance = publisher.getClientIdentifier();
- if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-// msg.debugIdentity() + ")");
-// }
- return false;
+ msgInstance = publisher.getClientIdentifier();
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ {
+ // if (_logger.isTraceEnabled())
+ // {
+ // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ // msg.debugIdentity() + ")");
+ // }
+ return false;
+ }
}
- }
-
+ }
}
- if (_logger.isTraceEnabled())
- {
- _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
- }
return checkFilters(entry);
}
@@ -433,23 +438,7 @@ public class SubscriptionImpl implements Subscription
private boolean checkFilters(QueueEntry msg)
{
- if (_filters != null)
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has filters.");
-// }
- return _filters.allAllow(msg.getMessage());
- }
- else
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no filters");
-// }
-
- return true;
- }
+ return (_filters == null) || _filters.allAllow(msg.getMessage());
}
public Queue<QueueEntry> getPreDeliveryQueue()
@@ -472,7 +461,7 @@ public class SubscriptionImpl implements Subscription
}
}
- public boolean isAutoClose()
+ private boolean isAutoClose()
{
return _autoClose;
}
@@ -534,19 +523,24 @@ public class SubscriptionImpl implements Subscription
{
_logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
- ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
- _sentClose = true;
-
- //fixme JIRA do this better
+ boolean unregisteredOK = false;
try
{
- channel.unsubscribeConsumer(protocolSession, consumerTag);
+ unregisteredOK = channel.unsubscribeConsumer(protocolSession, consumerTag);
}
catch (AMQException e)
{
// Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag.
+ _logger.info("Unable to UnsubscribeConsumer :" + consumerTag +" so not going to send CancelOK.");
+ }
+
+ if (unregisteredOK)
+ {
+ ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
+ _sentClose = true;
}
+
}
}
@@ -563,9 +557,9 @@ public class SubscriptionImpl implements Subscription
{
QueueEntry resent = _resendQueue.poll();
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Removed for resending:" + resent.debugIdentity());
+ _logger.debug("Removed for resending:" + resent.debugIdentity());
}
resent.release();
@@ -613,7 +607,7 @@ public class SubscriptionImpl implements Subscription
public boolean wouldSuspend(QueueEntry msg)
{
- return channel.wouldSuspend(msg.getMessage());
+ return _acks && channel.wouldSuspend(msg.getMessage());
}
public Queue<QueueEntry> getResendQueue()
@@ -677,4 +671,19 @@ public class SubscriptionImpl implements Subscription
return channel;
}
+ public void start()
+ {
+ //Check to see if we need to autoclose
+ if (filtersMessages())
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ autoclose();
+ }
+ }
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index b73b8d7e07..882efd380d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -37,7 +37,9 @@ class SubscriptionSet implements WeightedSubscriptionManager
/** Used to control the round robin delivery of content */
private int _currentSubscriber;
- private final Object _subscriptionsChange = new Object();
+
+ private final Object _changeLock = new Object();
+ private volatile boolean _exclusive;
/** Accessor for unit tests. */
@@ -48,7 +50,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
public void addSubscriber(Subscription subscription)
{
- synchronized (_subscriptionsChange)
+ synchronized (_changeLock)
{
_subscriptions.add(subscription);
}
@@ -66,7 +68,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
// TODO: possibly need O(1) operation here.
Subscription sub = null;
- synchronized (_subscriptionsChange)
+ synchronized (_changeLock)
{
int subIndex = _subscriptions.indexOf(subscription);
@@ -115,10 +117,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
*/
public Subscription nextSubscriber(QueueEntry msg)
{
- if (_subscriptions.isEmpty())
- {
- return null;
- }
+
try
{
@@ -142,30 +141,64 @@ class SubscriptionSet implements WeightedSubscriptionManager
private Subscription nextSubscriberImpl(QueueEntry msg)
{
- final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext())
+ if(_exclusive)
{
- Subscription subscription = iterator.next();
- ++_currentSubscriber;
- subscriberScanned();
-
- if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ try
{
- if (subscription.hasInterest(msg))
+ Subscription subscription = _subscriptions.get(0);
+ subscriberScanned();
+
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ if (subscription.hasInterest(msg))
{
- return subscription;
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
}
}
}
+ catch(IndexOutOfBoundsException e)
+ {
+ }
+ return null;
}
+ else
+ {
+ if (_subscriptions.isEmpty())
+ {
+ return null;
+ }
+ final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+ while (iterator.hasNext())
+ {
+ Subscription subscription = iterator.next();
+ ++_currentSubscriber;
+ subscriberScanned();
- return null;
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
}
/** Overridden in test classes. */
@@ -226,4 +259,16 @@ class SubscriptionSet implements WeightedSubscriptionManager
{
return _subscriptions.size();
}
+
+
+ public Object getChangeLock()
+ {
+ return _changeLock;
+ }
+
+ public void setExclusive(final boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
index 79ee6b93a3..9b91c71a1d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -60,7 +62,7 @@ public class TransientMessageData
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+ private List<AMQQueue> _destinationQueues;
public MessagePublishInfo getMessagePublishInfo()
{
@@ -74,7 +76,7 @@ public class TransientMessageData
public List<AMQQueue> getDestinationQueues()
{
- return _destinationQueues;
+ return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues;
}
public void setDestinationQueues(List<AMQQueue> destinationQueues)
@@ -109,6 +111,10 @@ public class TransientMessageData
public void addDestinationQueue(AMQQueue queue)
{
+ if(_destinationQueues == null)
+ {
+ _destinationQueues = new ArrayList<AMQQueue>();
+ }
_destinationQueues.add(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 42c32dcf00..fef958000a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,8 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.AMQException;
@@ -52,15 +52,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
private AuthenticationManager _authenticationManager;
- private AccessManager _accessManager;
+ private ACLPlugin _accessManager;
private PrincipalDatabaseManager _databaseManager;
private VirtualHostRegistry _virtualHostRegistry;
-
- private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>();
-
private PluginManager _pluginManager;
@@ -110,9 +107,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
_virtualHostRegistry = new VirtualHostRegistry();
- _accessManager = new AccessManagerImpl("default", _configuration);
+ _accessManager = ACLManager.loadACLManager("default", _configuration);
- _databaseManager = new ConfigurationFilePrincipalDatabaseManager();
+ _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
@@ -121,7 +118,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
_managedObjectRegistry.start();
_pluginManager = new PluginManager(_configuration.getString("plugin-directory"));
-
+
initialiseVirtualHosts();
}
@@ -154,7 +151,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
return _virtualHostRegistry;
}
- public AccessManager getAccessManager()
+ public ACLPlugin getAccessManager()
{
return _accessManager;
}
@@ -178,7 +175,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
return getConfiguration().getList("virtualhosts.virtualhost.name");
}
-
+
public PluginManager getPluginManager()
{
return _pluginManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index 6aac21a161..ca10fbdba2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
-import org.apache.qpid.server.security.access.AccessManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public interface IApplicationRegistry
@@ -68,8 +68,8 @@ public interface IApplicationRegistry
VirtualHostRegistry getVirtualHostRegistry();
- AccessManager getAccessManager();
-
+ ACLPlugin getAccessManager();
+
PluginManager getPluginManager();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
index 35d036d20f..539f32a732 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
@@ -23,33 +23,35 @@ package org.apache.qpid.server.security.access;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.access.plugins.DenyAll;
import org.apache.qpid.configuration.PropertyUtils;
import org.apache.log4j.Logger;
import java.util.List;
import java.lang.reflect.Method;
-import java.security.Principal;
-public class AccessManagerImpl implements AccessManager
+public class ACLManager
{
- private static final Logger _logger = Logger.getLogger(AccessManagerImpl.class);
+ private static final Logger _logger = Logger.getLogger(ACLManager.class);
- AccessManager _accessManager;
-
- public AccessManagerImpl(String name, Configuration hostConfig) throws ConfigurationException
+ public static ACLPlugin loadACLManager(String name, Configuration hostConfig) throws ConfigurationException
{
+ ACLPlugin aclPlugin = ApplicationRegistry.getInstance().getAccessManager();
+
if (hostConfig == null)
{
- _logger.warn("No Configuration specified. Using default access controls for VirtualHost:'" + name + "'");
- return;
+ _logger.warn("No Configuration specified. Using default ACLPlugin '" + aclPlugin.getPluginName()
+ + "' for VirtualHost:'" + name + "'");
+ return aclPlugin;
}
String accessClass = hostConfig.getString("security.access.class");
if (accessClass == null)
{
- _logger.warn("No access control specified. Using default access controls for VirtualHost:'" + name + "'");
- return;
+
+ _logger.warn("No ACL Plugin specified. Using default ACL Plugin '" + aclPlugin.getPluginName() +
+ "' for VirtualHost:'" + name + "'");
+ return aclPlugin;
}
Object o;
@@ -59,26 +61,35 @@ public class AccessManagerImpl implements AccessManager
}
catch (Exception e)
{
- throw new ConfigurationException("Error initialising access control: " + e, e);
+ throw new ConfigurationException("Error initialising ACL: " + e, e);
}
- if (!(o instanceof AccessManager))
+ if (!(o instanceof ACLPlugin))
{
- throw new ConfigurationException("Access control must implement the VirtualHostAccess interface");
+ throw new ConfigurationException("ACL Plugins must implement the ACLPlugin interface");
}
- initialiseAccessControl((AccessManager) o, hostConfig);
+ initialiseAccessControl((ACLPlugin) o, hostConfig);
- _accessManager = (AccessManager) o;
-
- _logger.info("Initialised access control for virtualhost '" + name + "' successfully");
+ aclPlugin = getManager((ACLPlugin) o);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Initialised ACL Plugin '" + aclPlugin.getPluginName()
+ + "' for virtualhost '" + name + "' successfully");
+ }
+ return aclPlugin;
}
- private void initialiseAccessControl(AccessManager accessManager, Configuration config)
+ private static void initialiseAccessControl(ACLPlugin accessManager, Configuration config)
throws ConfigurationException
{
+ //First provide the ACLPlugin with the host configuration
+
+ accessManager.setConfiguaration(config);
+
+ //Provide additional attribute customisation.
String baseName = "security.access.attributes.attribute.";
List<String> argumentNames = config.getList(baseName + "name");
List<String> argumentValues = config.getList(baseName + "value");
@@ -123,33 +134,28 @@ public class AccessManagerImpl implements AccessManager
}
}
- public AccessResult isAuthorized(Accessable accessObject, String username)
- {
- return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ);
- }
- public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights)
+ private static ACLPlugin getManager(ACLPlugin manager)
{
- if (_accessManager == null)
+ if (manager == null)
{
- if (ApplicationRegistry.getInstance().getAccessManager() == this)
+ if (ApplicationRegistry.getInstance().getAccessManager() == null)
{
- _logger.warn("No Default access manager specified DENYING ALL ACCESS");
- return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
+ return new DenyAll();
}
else
{
- return ApplicationRegistry.getInstance().getAccessManager().isAuthorized(accessObject, user, rights);
+ return ApplicationRegistry.getInstance().getAccessManager();
}
}
else
{
- return _accessManager.isAuthorized(accessObject, user, rights);
+ return manager;
}
}
- public String getName()
+ public static Logger getLogger()
{
- return "AccessManagerImpl";
+ return _logger;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
new file mode 100644
index 0000000000..7855f147b4
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.commons.configuration.Configuration;
+
+
+public interface ACLPlugin
+{
+ /**
+ * Pseudo-Code:
+ * Identify requested RighConnectiont
+ * Lookup users ability for that right.
+ * if rightsExists
+ * Validate right on object
+ * Return result
+ * e.g
+ * User, CONSUME , Queue
+ * User, CONSUME , Exchange + RoutingKey
+ * User, PUBLISH , Exchange + RoutingKey
+ * User, CREATE , Exchange || Queue
+ * User, BIND , Exchange + RoutingKey + Queue
+ *
+ * @param session - The session requesting access
+ * @param permission - The permission requested
+ * @param parameters - The above objects that are used to authorise the request.
+ * @return The AccessResult decision
+ */
+ //todo potential refactor this ConnectionException Out of here
+ AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException;
+
+ String getPluginName();
+
+ void setConfiguaration(Configuration config);
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
index b8d8fc605a..89cead69b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
@@ -30,15 +30,15 @@ public class AccessResult
StringBuilder _authorizer;
AccessStatus _status;
- public AccessResult(AccessManager authorizer, AccessStatus status)
+ public AccessResult(ACLPlugin authorizer, AccessStatus status)
{
_status = status;
- _authorizer = new StringBuilder(authorizer.getName());
+ _authorizer = new StringBuilder(authorizer.getPluginName());
}
- public void setAuthorizer(AccessManager authorizer)
+ public void setAuthorizer(ACLPlugin authorizer)
{
- _authorizer.append(authorizer.getName());
+ _authorizer.append(authorizer.getPluginName());
}
public String getAuthorizer()
@@ -56,10 +56,10 @@ public class AccessResult
return _status;
}
- public void addAuthorizer(AccessManager accessManager)
+ public void addAuthorizer(ACLPlugin accessManager)
{
_authorizer.insert(0, "->");
- _authorizer.insert(0, accessManager.getName());
+ _authorizer.insert(0, accessManager.getPluginName());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java
deleted file mode 100644
index 1ddca3a64e..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.security.access;
-
-import java.security.Principal;
-
-public class AllowAll implements AccessManager
-{
-
- public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights)
- {
- return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
- }
-
- public AccessResult isAuthorized(Accessable accessObject, String username)
- {
- return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
- }
-
- public String getName()
- {
- return "AllowAll";
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java
deleted file mode 100644
index bf40eeba4e..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.security.access;
-
-import java.security.Principal;
-
-public class DenyAll implements AccessManager
-{
- public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights)
- {
- return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
- }
-
- public AccessResult isAuthorized(Accessable accessObject, String username)
- {
- return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
- }
-
- public String getName()
- {
- return "DenyAll";
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java
deleted file mode 100644
index 291bc714ed..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.security.access;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.FileNotFoundException;
-import java.io.File;
-import java.util.regex.Pattern;
-import java.security.Principal;
-
-/**
- * Represents a user database where the account information is stored in a simple flat file.
- *
- * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn
- *
- * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text.
- */
-public class FileAccessManager implements AccessManager
-{
- private static final Logger _logger = Logger.getLogger(FileAccessManager.class);
-
- protected File _accessFile;
-
- protected Pattern _regexp = Pattern.compile(":");
-
- private static final short USER_INDEX = 0;
- private static final short VIRTUALHOST_INDEX = 1;
-
- public void setAccessFile(String accessFile) throws FileNotFoundException
- {
- File f = new File(accessFile);
- _logger.info("FileAccessManager using file " + f.getAbsolutePath());
- _accessFile = f;
- if (!f.exists())
- {
- throw new FileNotFoundException("Cannot find access file " + f);
- }
- if (!f.canRead())
- {
- throw new FileNotFoundException("Cannot read access file " + f +
- ". Check permissions.");
- }
- }
-
- /**
- * Looks up the virtual hosts for a specified user in the access file.
- *
- * @param user The user to lookup
- *
- * @return a list of virtualhosts
- */
- private VirtualHostAccess[] lookupVirtualHost(String user)
- {
- String[] results = lookup(user, VIRTUALHOST_INDEX);
- VirtualHostAccess vhosts[] = new VirtualHostAccess[results.length];
-
- for (int index = 0; index < results.length; index++)
- {
- vhosts[index] = new VirtualHostAccess(results[index]);
- }
-
- return vhosts;
- }
-
-
- private String[] lookup(String user, int index)
- {
- try
- {
- BufferedReader reader = null;
- try
- {
- reader = new BufferedReader(new FileReader(_accessFile));
- String line;
-
- while ((line = reader.readLine()) != null)
- {
- String[] result = _regexp.split(line);
- if (result == null || result.length < (index + 1))
- {
- continue;
- }
-
- if (user.equals(result[USER_INDEX]))
- {
- return result[index].split(",");
- }
- }
- return null;
- }
- finally
- {
- if (reader != null)
- {
- reader.close();
- }
- }
- }
- catch (IOException ioe)
- {
- //ignore
- }
- return null;
- }
-
- public AccessResult isAuthorized(Accessable accessObject, String username)
- {
- return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ);
- }
-
- public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights)
- {
- if (accessObject instanceof VirtualHost)
- {
- VirtualHostAccess[] hosts = lookupVirtualHost(user.getName());
-
- if (hosts != null)
- {
- for (VirtualHostAccess host : hosts)
- {
- if (accessObject.getAccessableName().equals(host.getVirtualHost()))
- {
- if (host.getAccessRights().allows(rights))
- {
- return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
- }
- else
- {
- return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
- }
- }
- }
- }
- }
-// else if (accessObject instanceof AMQQueue)
-// {
-// String[] queues = lookupQueue(username, ((AMQQueue) accessObject).getVirtualHost());
-//
-// if (queues != null)
-// {
-// for (String queue : queues)
-// {
-// if (accessObject.getAccessableName().equals(queue))
-// {
-// return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
-// }
-// }
-// }
-// }
-
- return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
- }
-
- public String getName()
- {
- return "FileAccessManager";
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java
index d70a6dc8f4..5d439a99eb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java
@@ -1,34 +1,37 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.security.access;
-
-import java.security.Principal;
-
-public interface AccessManager
-{
- AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights);
-
- @Deprecated
- AccessResult isAuthorized(Accessable accessObject, String username);
-
- String getName();
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public enum Permission
+{
+ CONSUME,
+ PUBLISH,
+ CREATE,
+ ACCESS,
+ BIND,
+ UNBIND,
+ DELETE,
+ PURGE
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java
deleted file mode 100644
index 6ccadb2e7d..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.security.access;
-
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.log4j.Logger;
-
-import java.security.Principal;
-
-public class PrincipalDatabaseAccessManager implements AccessManager
-{
- private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAccessManager.class);
-
- PrincipalDatabase _database;
- AccessManager _default;
-
- public PrincipalDatabaseAccessManager()
- {
- _default = null;
- }
-
- public void setDefaultAccessManager(String defaultAM)
- {
- if (defaultAM.equals("AllowAll"))
- {
- _default = new AllowAll();
- }
-
- if (defaultAM.equals("DenyAll"))
- {
- _default = new DenyAll();
- }
- }
-
- public void setPrincipalDatabase(String database)
- {
- _database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(database);
- if (!(_database instanceof AccessManager))
- {
- _logger.warn("Database '" + database + "' cannot perform access management");
- }
- }
-
-
- public AccessResult isAuthorized(Accessable accessObject, String username)
- {
- return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ);
- }
-
- public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights)
- {
- AccessResult result;
-
- if (_database == null)
- {
- if (_default != null)
- {
- result = _default.isAuthorized(accessObject, username, rights);
- }
- else
- {
- throw new RuntimeException("Principal Database and default Access Manager are both null unable to perform Access Control");
- }
- }
- else
- {
- if (!(_database instanceof AccessManager))
- {
- _logger.warn("Specified PrincipalDatabase is not an AccessManager so using default AccessManager");
- result = _default.isAuthorized(accessObject, username, rights);
- }
- else
- {
- result = ((AccessManager) _database).isAuthorized(accessObject, username, rights);
- }
- }
-
- result.addAuthorizer(this);
-
- return result;
- }
-
- public String getName()
- {
- return "PrincipalDatabaseFileAccessManager";
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
new file mode 100755
index 0000000000..23073e0613
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
@@ -0,0 +1,579 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.Exchange;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PrincipalPermissions
+{
+
+ private static final Object CONSUME_QUEUES_KEY = new Object();
+ private static final Object CONSUME_TEMPORARY_KEY = new Object();
+ private static final Object CONSUME_OWN_QUEUES_ONLY_KEY = new Object();
+
+ private static final Object CREATE_QUEUES_KEY = new Object();
+ private static final Object CREATE_EXCHANGES_KEY = new Object();
+
+ private static final Object CREATE_QUEUE_TEMPORARY_KEY = new Object();
+ private static final Object CREATE_QUEUE_QUEUES_KEY = new Object();
+ private static final Object CREATE_QUEUE_EXCHANGES_KEY = new Object();
+
+ private static final Object CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY = new Object();
+ private static final Object CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY = new Object();
+
+ private static final int PUBLISH_EXCHANGES_KEY = 0;
+
+ private Map _permissions;
+
+ private String _user;
+
+
+ public PrincipalPermissions(String user)
+ {
+ _user = user;
+ _permissions = new ConcurrentHashMap();
+ }
+
+ public void grant(Permission permission, Object... parameters)
+ {
+ switch (permission)
+ {
+ case ACCESS:
+ break; // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS
+ case BIND:
+ break; // All the details are currently included in the create setup.
+ case CONSUME: // Parameters : AMQShortString queueName, Boolean Temporary, Boolean ownQueueOnly
+ Map consumeRights = (Map) _permissions.get(permission);
+
+ if (consumeRights == null)
+ {
+ consumeRights = new ConcurrentHashMap();
+ _permissions.put(permission, consumeRights);
+ }
+
+ //if we have parametsre
+ if (parameters.length > 0)
+ {
+ AMQShortString queueName = (AMQShortString) parameters[0];
+ Boolean temporary = (Boolean) parameters[1];
+ Boolean ownQueueOnly = (Boolean) parameters[2];
+
+ if (temporary)
+ {
+ consumeRights.put(CONSUME_TEMPORARY_KEY, true);
+ }
+ else
+ {
+ consumeRights.put(CONSUME_TEMPORARY_KEY, false);
+ }
+
+ if (ownQueueOnly)
+ {
+ consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true);
+ }
+ else
+ {
+ consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false);
+ }
+
+
+ LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY);
+ if (queues == null)
+ {
+ queues = new LinkedList();
+ consumeRights.put(CONSUME_QUEUES_KEY, queues);
+ }
+
+ if (queueName != null)
+ {
+ queues.add(queueName);
+ }
+ }
+
+
+ break;
+ case CREATE: // Parameters : Boolean temporary, AMQShortString queueName
+ // , AMQShortString exchangeName , AMQShortString routingKey
+ // || AMQShortString exchangeName , AMQShortString Class
+
+ Map createRights = (Map) _permissions.get(permission);
+
+ if (createRights == null)
+ {
+ createRights = new ConcurrentHashMap();
+ _permissions.put(permission, createRights);
+
+ }
+
+ //The existence of the empty map mean permission to all.
+ if (parameters.length == 0)
+ {
+ return;
+ }
+
+
+ if (parameters[0] instanceof Boolean) //Create Queue :
+ // Boolean temporary, [AMQShortString queueName, AMQShortString exchangeName , AMQShortString routingKey]
+ {
+ Boolean temporary = (Boolean) parameters[0];
+
+ AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
+ AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null;
+ //Set the routingkey to the specified value or the queueName if present
+ AMQShortString routingKey = parameters.length > 3 ? (AMQShortString) parameters[3] : queueName;
+
+ // Get the queues map
+ Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+ if (create_queues == null)
+ {
+ create_queues = new ConcurrentHashMap();
+ createRights.put(CREATE_QUEUES_KEY, create_queues);
+ }
+
+ //Allow all temp queues to be created
+ create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary);
+
+ //Create empty list of queues
+ Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+ if (create_queues_queues == null)
+ {
+ create_queues_queues = new ConcurrentHashMap();
+ create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues);
+ }
+
+ // We are granting CREATE rights to all temporary queues only
+ if (parameters.length == 1)
+ {
+ return;
+ }
+
+ // if we have a queueName then we need to store any associated exchange / rk bindings
+ if (queueName != null)
+ {
+ Map queue = (Map) create_queues_queues.get(queueName);
+ if (queue == null)
+ {
+ queue = new ConcurrentHashMap();
+ create_queues_queues.put(queueName, queue);
+ }
+
+ if (exchangeName != null)
+ {
+ queue.put(exchangeName, routingKey);
+ }
+
+ //If no exchange is specified then the presence of the queueName in the map says any exchange is ok
+ }
+
+ // Store the exchange that we are being granted rights to. This will be used as part of binding
+
+ //Lookup the list of exchanges
+ Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+ if (create_queues_exchanges == null)
+ {
+ create_queues_exchanges = new ConcurrentHashMap();
+ create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges);
+ }
+
+ //if we have an exchange
+ if (exchangeName != null)
+ {
+ //Retrieve the list of permitted exchanges.
+ Map exchanges = (Map) create_queues_exchanges.get(exchangeName);
+
+ if (exchanges == null)
+ {
+ exchanges = new ConcurrentHashMap();
+ create_queues_exchanges.put(exchangeName, exchanges);
+ }
+
+ //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY
+ exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary);
+
+ //Store the binding details of queue/rk for this exchange.
+ if (queueName != null)
+ {
+ //Retrieve the list of permitted routingKeys.
+ Map rKeys = (Map) exchanges.get(exchangeName);
+
+ if (rKeys == null)
+ {
+ rKeys = new ConcurrentHashMap();
+ exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys);
+ }
+
+ rKeys.put(queueName, routingKey);
+ }
+ }
+ }
+ else // Create Exchange : AMQShortString exchangeName , AMQShortString Class
+ {
+ Map create_exchanges = (Map) createRights.get(CREATE_EXCHANGES_KEY);
+
+ if (create_exchanges == null)
+ {
+ create_exchanges = new ConcurrentHashMap();
+ createRights.put(CREATE_EXCHANGES_KEY, create_exchanges);
+ }
+
+ //Should perhaps error if parameters[0] is null;
+ AMQShortString exchangeName = parameters.length > 0 ? (AMQShortString) parameters[0] : null;
+ AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
+
+ //Store the exchangeName / class mapping if the mapping is null
+ createRights.put(exchangeName, className);
+ }
+ break;
+ case DELETE:
+ break;
+
+ case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
+ Map publishRights = (Map) _permissions.get(permission);
+
+ if (publishRights == null)
+ {
+ publishRights = new ConcurrentHashMap();
+ _permissions.put(permission, publishRights);
+ }
+
+ if (parameters == null || parameters.length == 0)
+ {
+ //If we have no parameters then allow publish to all destinations
+ // this is signified by having a null value for publish_exchanges
+ }
+ else
+ {
+ Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+ if (publish_exchanges == null)
+ {
+ publish_exchanges = new ConcurrentHashMap();
+ publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges);
+ }
+
+
+ HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]);
+
+ // Check to see if we have a routing key
+ if (parameters.length == 2)
+ {
+ if (routingKeys == null)
+ {
+ routingKeys = new HashSet<AMQShortString>();
+ }
+ //Add routing key to permitted publish destinations
+ routingKeys.add(parameters[1]);
+ }
+
+ // Add the updated routingkey list or null if all values allowed
+ publish_exchanges.put(parameters[0], routingKeys);
+ }
+ break;
+ case PURGE:
+ break;
+ case UNBIND:
+ break;
+ }
+
+ }
+
+ public boolean authorise(Permission permission, Object... parameters)
+ {
+
+ switch (permission)
+ {
+ case ACCESS:
+ return true; // This is here for completeness but the SimpleXML ACLManager never calls it.
+ // The existence of this user specific PP can be validated in the map SimpleXML maintains.
+ case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey
+
+ Exchange exchange = (Exchange) parameters[1];
+
+ AMQQueue bind_queueName = (AMQQueue) parameters[2];
+ AMQShortString routingKey = (AMQShortString) parameters[3];
+
+ //Get all Create Rights for this user
+ Map bindCreateRights = (Map) _permissions.get(Permission.CREATE);
+
+ //Look up the Queue Creation Rights
+ Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY);
+
+ //Lookup the list of queues
+ Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY);
+
+ // Check and see if we have a queue white list to check
+ if (bind_create_queues_queues != null)
+ {
+ //There a white list for queues
+ Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName);
+
+ if (exchangeDetails == null) //Then all queue can be bound to all exchanges.
+ {
+ return true;
+ }
+
+ // Check to see if we have a white list of routingkeys to check
+ Map rkeys = (Map) exchangeDetails.get(exchange.getName());
+
+ // if keys is null then any rkey is allowed on this exchange
+ if (rkeys == null)
+ {
+ // There is no routingkey white list
+ return true;
+ }
+ else
+ {
+ // We have routingKeys so a match must be found to allowed binding
+ Iterator keys = rkeys.keySet().iterator();
+
+ boolean matched = false;
+ while (keys.hasNext() && !matched)
+ {
+ AMQShortString rkey = (AMQShortString) keys.next();
+ if (rkey.endsWith("*"))
+ {
+ matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString());
+ }
+ else
+ {
+ matched = routingKey.equals(rkey);
+ }
+ }
+
+
+ return matched;
+ }
+
+
+ }
+ else
+ {
+ //There a is no white list for queues
+
+ // So can allow all queues to be bound
+ // but we should first check and see if we have a temp queue and validate that we are allowed
+ // to bind temp queues.
+
+ //Check to see if we have a temporary queue
+ if (bind_queueName.isAutoDelete())
+ {
+ // Check and see if we have an exchange white list.
+ Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+ // If the exchange exists then we must check to see if temporary queues are allowed here
+ if (bind_exchanges != null)
+ {
+ // Check to see if the requested exchange is allowed.
+ Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName());
+
+ return (Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY);
+ }
+
+ //no white list so all allowed, drop through to return true below.
+ }
+
+ // not a temporary queue and no white list so all allowed.
+ return true;
+ }
+
+ case CREATE:// Paramters : QueueDeclareBody || ExchangeDeclareBody
+
+ Map createRights = (Map) _permissions.get(permission);
+
+ // If there are no create rights then deny request
+ if (createRights == null)
+ {
+ return false;
+ }
+
+ if (parameters.length == 1)
+ {
+ if (parameters[0] instanceof QueueDeclareBody)
+ {
+ QueueDeclareBody body = (QueueDeclareBody) parameters[0];
+
+ //Look up the Queue Creation Rights
+ Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+ //Lookup the list of queues allowed to be created
+ Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+
+ AMQShortString queueName = body.getQueue();
+
+
+ if (body.getAutoDelete())// we have a temporary queue
+ {
+ return (Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY);
+ }
+ else
+ {
+ // If there is a white list then check
+ return create_queues_queues == null || create_queues_queues.containsKey(queueName);
+ }
+
+ }
+ else if (parameters[0] instanceof ExchangeDeclareBody)
+ {
+ ExchangeDeclareBody body = (ExchangeDeclareBody) parameters[0];
+
+ AMQShortString exchangeName = body.getExchange();
+
+ Map create_exchanges = (Map) createRights.get(CREATE_EXCHANGES_KEY);
+
+ // If the exchange list is doesn't exist then all is allowed else check the valid exchanges
+ return create_exchanges == null || create_exchanges.containsKey(exchangeName);
+ }
+ }
+ break;
+ case CONSUME: // Parameters : AMQQueue
+
+ if (parameters.length == 1 && parameters[0] instanceof AMQQueue)
+ {
+ AMQQueue queue = ((AMQQueue) parameters[0]);
+ Map queuePermissions = (Map) _permissions.get(permission);
+
+ List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
+
+ Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY);
+ Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY);
+
+ // If user is allowed to publish to temporary queues and this is a temp queue then allow it.
+ if (temporayQueues)
+ {
+ if (queue.isAutoDelete())
+ // This will allow consumption from any temporary queue including ones not owned by this user.
+ // Of course the exclusivity will not be broken.
+ {
+ // if not limited to ownQueuesOnly then ok else check queue Owner.
+ return !ownQueuesOnly || queue.getOwner().equals(_user);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ // if queues are white listed then ensure it is ok
+ if (queues != null)
+ {
+ // if no queues are listed then ALL are ok othereise it must be specified.
+ if (ownQueuesOnly)
+ {
+ if (queue.getOwner().equals(_user))
+ {
+ return queues.size() == 0 || queues.contains(queue.getName());
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ // If we are
+ return queues.size() == 0 || queues.contains(queue.getName());
+ }
+ }
+
+ // Can't authenticate without the right parameters
+ return false;
+ case DELETE:
+ break;
+
+ case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
+ Map publishRights = (Map) _permissions.get(permission);
+
+ if (publishRights == null)
+ {
+ return false;
+ }
+
+ Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+ // Having no exchanges listed gives full publish rights to all exchanges
+ if (exchanges == null)
+ {
+ return true;
+ }
+ // Otherwise exchange must be listed in the white list
+
+ // If the map doesn't have the exchange then it isn't allowed
+ if (!exchanges.containsKey(parameters[0]))
+ {
+ return false;
+ }
+ else
+ {
+
+ // Get valid routing keys
+ HashSet routingKeys = (HashSet) exchanges.get(parameters[0]);
+
+ // Having no routingKeys in the map then all are allowed.
+ if (routingKeys == null)
+ {
+ return true;
+ }
+ else
+ {
+ // We have routingKeys so a match must be found to allowed binding
+ Iterator keys = routingKeys.iterator();
+
+
+ AMQShortString publishRKey = (AMQShortString)parameters[1];
+
+ boolean matched = false;
+ while (keys.hasNext() && !matched)
+ {
+ AMQShortString rkey = (AMQShortString) keys.next();
+
+ if (rkey.endsWith("*"))
+ {
+ matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1));
+ }
+ else
+ {
+ matched = publishRKey.equals(rkey);
+ }
+ }
+ return matched;
+ }
+ }
+ case PURGE:
+ break;
+ case UNBIND:
+ break;
+
+ }
+
+ return false;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java
index 2dc7fcbc1e..a8ae03cc5d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java
@@ -18,7 +18,7 @@
*
*
*/
-package org.apache.qpid.server.security.access;
+package org.apache.qpid.server.security.access.management;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
@@ -26,6 +26,7 @@ import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.access.management.UserManagement;
import org.apache.log4j.Logger;
import org.apache.commons.configuration.ConfigurationException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java
index b8762aa43b..658d7ebbd3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java
@@ -18,7 +18,7 @@
*
*
*/
-package org.apache.qpid.server.security.access;
+package org.apache.qpid.server.security.access.management;
import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanOperationParameter;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
new file mode 100644
index 0000000000..9b784069dd
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access.plugins;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.access.Permission;
+import org.apache.commons.configuration.Configuration;
+
+public class AllowAll implements ACLPlugin
+{
+ public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters)
+ {
+ if (ACLManager.getLogger().isDebugEnabled())
+ {
+ ACLManager.getLogger().debug("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString()
+ + " on " + body.getClass().getSimpleName()
+ + (parameters == null || parameters.length == 0 ? "" : "-" + accessablesToString(parameters)));
+ }
+
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ }
+
+ public static String accessablesToString(Object[] accessObject)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ for (Object access : accessObject)
+ {
+ sb.append(access.getClass().getSimpleName() + ":" + access.toString() + ", ");
+ }
+
+ return sb.delete(sb.length() - 2, sb.length()).toString();
+ }
+
+ public String getPluginName()
+ {
+ return "AllowAll";
+ }
+
+ public void setConfiguaration(Configuration config)
+ {
+ //no-op
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
new file mode 100644
index 0000000000..80c125e737
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access.plugins;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.commons.configuration.Configuration;
+
+public class DenyAll implements ACLPlugin
+{
+ public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException
+ {
+
+ if (ACLManager.getLogger().isInfoEnabled())
+ {
+ }
+ ACLManager.getLogger().info("Denying user:" + session.getAuthorizedID() + " for :" + permission.toString()
+ + " on " + body.getClass().getSimpleName()
+ + (parameters == null || parameters.length == 0 ? "" : "-" + AllowAll.accessablesToString(parameters)));
+
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "DenyAll Plugin");
+ }
+
+ public String getPluginName()
+ {
+ return "DenyAll";
+ }
+
+ public void setConfiguaration(Configuration config)
+ {
+ //no-op
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
new file mode 100644
index 0000000000..251f4e6330
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.server.security.access.plugins;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicPublishBody;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.server.security.access.PrincipalPermissions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This uses the default
+ */
+public class SimpleXML implements ACLPlugin
+{
+ private Map<String, PrincipalPermissions> _users;
+ private final AccessResult GRANTED = new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+
+ public SimpleXML()
+ {
+ _users = new ConcurrentHashMap<String, PrincipalPermissions>();
+ }
+
+ public void setConfiguaration(Configuration config)
+ {
+ processConfig(config);
+ }
+
+ private void processConfig(Configuration config)
+ {
+ processPublish(config);
+
+ processConsume(config);
+
+ processCreate(config);
+ }
+
+ /**
+ * Publish format takes
+ * Exchange + Routing Key Pairs
+ *
+ * @param config XML Configuration
+ */
+ private void processPublish(Configuration config)
+ {
+ Configuration publishConfig = config.subset("security.access_control_list.publish");
+
+ //Process users that have full publish permission
+ String[] users = publishConfig.getStringArray("users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.PUBLISH, user);
+ }
+
+ // Process exchange limited users
+ int exchangeCount = 0;
+ Configuration exchangeConfig = publishConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+
+ while (!exchangeConfig.isEmpty())
+ {
+ //Get Exchange Name
+ AMQShortString exchangeName = new AMQShortString(exchangeConfig.getString("name"));
+
+ //Get Routing Keys
+ int keyCount = 0;
+ Configuration routingkeyConfig = exchangeConfig.subset("routing_keys.routing_key(" + keyCount + ")");
+
+ while (!routingkeyConfig.isEmpty())
+ {
+ //Get RoutingKey Value
+ AMQShortString routingKeyValue = new AMQShortString(routingkeyConfig.getString("value"));
+
+ //Apply Exchange + RoutingKey permissions to Users
+ users = routingkeyConfig.getStringArray("users.user");
+ for (String user : users)
+ {
+ grant(Permission.PUBLISH, user, exchangeName, routingKeyValue);
+ }
+
+ //Apply permissions to Groups
+
+ // Check for more configs
+ keyCount++;
+ routingkeyConfig = exchangeConfig.subset("routing_keys.routing_key(" + keyCount + ")");
+ }
+
+ //Apply Exchange wide permissions to Users
+ users = exchangeConfig.getStringArray("exchange(" + exchangeCount + ").users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.PUBLISH, user, exchangeName);
+ }
+
+ //Apply permissions to Groups
+ exchangeCount++;
+ exchangeConfig = publishConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+ }
+ }
+
+ private void grant(Permission permission, String user, Object... parameters)
+ {
+ PrincipalPermissions permissions = _users.get(user);
+
+ if (permissions == null)
+ {
+ permissions = new PrincipalPermissions(user);
+ }
+
+ _users.put(user, permissions);
+ permissions.grant(permission, parameters);
+ }
+
+ private void processConsume(Configuration config)
+ {
+ Configuration consumeConfig = config.subset("security.access_control_list.consume");
+
+ // Process queue limited users
+ int queueCount = 0;
+ Configuration queueConfig = consumeConfig.subset("queues.queue(" + queueCount + ")");
+
+ while (!queueConfig.isEmpty())
+ {
+ //Get queue Name
+ AMQShortString queueName = new AMQShortString(queueConfig.getString("name"));
+ // if there is no name then there may be a temporary element
+ boolean temporary = queueConfig.containsKey("temporary");
+ boolean ownQueues = queueConfig.containsKey("own_queues");
+
+ //Process permissions for this queue
+ String[] users = queueConfig.getStringArray("users.user");
+ for (String user : users)
+ {
+ grant(Permission.CONSUME, user, queueName, temporary, ownQueues);
+ }
+
+ //See if we have another config
+ queueCount++;
+ queueConfig = consumeConfig.subset("queues.queue(" + queueCount + ")");
+ }
+
+ // Process users that have full consume permission
+ String[] users = consumeConfig.getStringArray("users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.CONSUME, user);
+ }
+ }
+
+ private void processCreate(Configuration config)
+ {
+ Configuration createConfig = config.subset("security.access_control_list.create");
+
+ // Process create permissions for queue creation
+ int queueCount = 0;
+ Configuration queueConfig = createConfig.subset("queues.queue(" + queueCount + ")");
+
+ while (!queueConfig.isEmpty())
+ {
+ //Get queue Name
+ AMQShortString queueName = new AMQShortString(queueConfig.getString("name"));
+
+ // if there is no name then there may be a temporary element
+ boolean temporary = queueConfig.containsKey("temporary");
+
+ int exchangeCount = 0;
+ Configuration exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+
+ while (!exchangeConfig.isEmpty())
+ {
+
+ AMQShortString exchange = new AMQShortString(exchangeConfig.getString("name"));
+ AMQShortString routingKey = new AMQShortString(exchangeConfig.getString("routing_key"));
+
+ //Process permissions for this queue
+ String[] users = exchangeConfig.getStringArray("users.user");
+ for (String user : users)
+ {
+ grant(Permission.CREATE, user, temporary,
+ (queueName.equals("") ? null : queueName),
+ (exchange.equals("") ? null : exchange),
+ (routingKey.equals("") ? null : routingKey));
+ }
+
+ //See if we have another config
+ exchangeCount++;
+ exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+ }
+
+ // Process users that are not bound to an exchange
+ String[] users = queueConfig.getStringArray("users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.CREATE, user, temporary, queueName);
+ }
+
+ //See if we have another config
+ queueCount++;
+ queueConfig = createConfig.subset("queues.queue(" + queueCount + ")");
+ }
+
+ // Process create permissions for exchange creation
+ int exchangeCount = 0;
+ Configuration exchangeConfig = createConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+
+ while (!exchangeConfig.isEmpty())
+ {
+ AMQShortString exchange = new AMQShortString(exchangeConfig.getString("name"));
+ AMQShortString clazz = new AMQShortString(exchangeConfig.getString("class"));
+
+ //Process permissions for this queue
+ String[] users = exchangeConfig.getStringArray("users.user");
+ for (String user : users)
+ {
+ grant(Permission.CREATE, user, exchange, clazz);
+ }
+
+ //See if we have another config
+ exchangeCount++;
+ exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+ }
+
+ // Process users that have full create permission
+ String[] users = createConfig.getStringArray("users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.CREATE, user);
+ }
+
+
+ }
+
+ public String getPluginName()
+ {
+ return "Simple";
+ }
+
+ public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException
+ {
+ String error = "";
+
+ if (ACLManager.getLogger().isInfoEnabled())
+ {
+ ACLManager.getLogger().info("Simple Authorisation processing user:" + session.getAuthorizedID() + " for :" + permission.toString()
+ + " on " + body.getClass().getSimpleName()
+ + (parameters == null || parameters.length == 0 ? "" : "-" + AllowAll.accessablesToString(parameters)));
+ }
+
+ String username = session.getAuthorizedID().getName();
+
+ //Get the Users Permissions
+ PrincipalPermissions permissions = _users.get(username);
+
+ if (permissions != null)
+ {
+ switch (permission)
+ {
+ case ACCESS:
+ return GRANTED;
+ case BIND: // Body QueueDeclareBody - Parameters : Exchange, Queue, QueueName
+ // Body QueueBindBody - Paramters : Exchange, Queue, QueueName
+ if (parameters.length == 3)
+ {
+ // Parameters : Exchange, Queue, RoutingKey
+ if (permissions.authorise(Permission.BIND, body, parameters[0], parameters[1], parameters[2]))
+ {
+ return GRANTED;
+ }
+ }
+ break;
+ case CONSUME: // Parameters : none
+ if (parameters.length == 1 && permissions.authorise(Permission.CONSUME, parameters[0]))
+ {
+ return GRANTED;
+ }
+ break;
+ case CREATE: // Body : QueueDeclareBody | ExchangeDeclareBody - Parameters : none
+ if (permissions.authorise(Permission.CREATE, body))
+ {
+ return GRANTED;
+ }
+ break;
+ case PUBLISH: // Body : BasicPublishBody Parameters : exchange
+ if (parameters.length == 1 && parameters[0] instanceof Exchange)
+ {
+ if (permissions.authorise(Permission.PUBLISH, ((Exchange) parameters[0]).getName(),
+ ((BasicPublishBody) body).getRoutingKey()))
+ {
+ return GRANTED;
+ }
+ }
+ break;
+ case PURGE:
+ break;
+ case DELETE:
+ break;
+ case UNBIND:
+ break;
+ }
+ }
+
+ //todo potential refactor this ConnectionException Out of here
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
index 10adfdd9fc..348bccb4e9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
-import org.apache.qpid.server.security.access.AMQUserManagementMBean;
+import org.apache.qpid.server.security.access.management.AMQUserManagementMBean;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.EncoderException;
@@ -45,7 +45,6 @@ import java.util.LinkedList;
import java.util.concurrent.locks.ReentrantLock;
import java.security.Principal;
import java.security.NoSuchAlgorithmException;
-import java.security.MessageDigest;
/**
* Represents a user database where the account information is stored in a simple flat file.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
index 2d3f5e5131..15c62a62e4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
@@ -37,7 +37,7 @@ import org.apache.qpid.configuration.PropertyException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
-import org.apache.qpid.server.security.access.AMQUserManagementMBean;
+import org.apache.qpid.server.security.access.management.AMQUserManagementMBean;
import org.apache.qpid.AMQException;
import javax.management.JMException;
@@ -50,17 +50,16 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
Map<String, PrincipalDatabase> _databases;
- public ConfigurationFilePrincipalDatabaseManager() throws Exception
+ public ConfigurationFilePrincipalDatabaseManager(Configuration configuration) throws Exception
{
_logger.info("Initialising PrincipleDatabase authentication manager");
- _databases = initialisePrincipalDatabases();
+ _databases = initialisePrincipalDatabases(configuration);
}
- private Map<String, PrincipalDatabase> initialisePrincipalDatabases() throws Exception
+ private Map<String, PrincipalDatabase> initialisePrincipalDatabases(Configuration configuration) throws Exception
{
- Configuration config = ApplicationRegistry.getInstance().getConfiguration();
- List<String> databaseNames = config.getList(_base + ".name");
- List<String> databaseClasses = config.getList(_base + ".class");
+ List<String> databaseNames = configuration.getList(_base + ".name");
+ List<String> databaseClasses = configuration.getList(_base + ".class");
Map<String, PrincipalDatabase> databases = new HashMap<String, PrincipalDatabase>();
if (databaseNames.size() == 0)
@@ -85,7 +84,7 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
throw new Exception("Principal databases must implement the PrincipalDatabase interface");
}
- initialisePrincipalDatabase((PrincipalDatabase) o, config, i);
+ initialisePrincipalDatabase((PrincipalDatabase) o, configuration, i);
String name = databaseNames.get(i);
if ((name == null) || (name.length() == 0))
@@ -200,7 +199,7 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
}
String jmxaccesssFile = null;
-
+
try
{
jmxaccesssFile = PropertyUtils.replaceProperties(jmxaccesslist.get(0));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java
deleted file mode 100644
index 5c372f6c2c..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.security.auth.database;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessResult;
-import org.apache.qpid.server.security.access.AccessRights;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.security.Principal;
-
-/**
- * Represents a user database where the account information is stored in a simple flat file.
- *
- * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn
- *
- * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text.
- */
-public class PlainPasswordVhostFilePrincipalDatabase extends PlainPasswordFilePrincipalDatabase implements AccessManager
-{
- private static final Logger _logger = Logger.getLogger(PlainPasswordVhostFilePrincipalDatabase.class);
-
- /**
- * Looks up the virtual hosts for a specified user in the password file.
- *
- * @param user The user to lookup
- *
- * @return a list of virtualhosts
- */
- private String[] lookupVirtualHost(String user)
- {
- try
- {
- BufferedReader reader = null;
- try
- {
- reader = new BufferedReader(new FileReader(_passwordFile));
- String line;
-
- while ((line = reader.readLine()) != null)
- {
- if (!line.startsWith("#"))
- {
- String[] result = _regexp.split(line);
- if (result == null || result.length < 3)
- {
- continue;
- }
-
- if (user.equals(result[0]))
- {
- return result[2].split(",");
- }
- }
- }
- return null;
- }
- finally
- {
- if (reader != null)
- {
- reader.close();
- }
- }
- }
- catch (IOException ioe)
- {
- //ignore
- }
- return null;
- }
-
-
- public AccessResult isAuthorized(Accessable accessObject, String username)
- {
- return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ);
- }
-
- public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights)
- {
-
- if (accessObject instanceof VirtualHost)
- {
- String[] hosts = lookupVirtualHost(user.getName());
-
- if (hosts != null)
- {
- for (String host : hosts)
- {
- if (accessObject.getAccessableName().equals(host))
- {
- return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
- }
- }
- }
- }
-
- return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
- }
-
- public String getName()
- {
- return "PlainPasswordVhostFile";
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
index 73d58ca489..c8a4add0f1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
@@ -69,10 +69,14 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase
{
throw new IllegalArgumentException("principal must not be null");
}
- char[] pwd = _users.getProperty(principal.getName()).toCharArray();
+
+
+
+ final String pwd = _users.getProperty(principal.getName());
+
if (pwd != null)
{
- callback.setPassword(pwd);
+ callback.setPassword(pwd.toCharArray());
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index ce5e0cd748..f589140e8e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -82,14 +82,6 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
if (databaseName == null)
{
- if (hostConfig instanceof SubsetConfiguration)
- {
- _logger.warn("No authentication specified for '" + ((SubsetConfiguration) hostConfig).getPrefix() + "'. Using Default authentication manager");
- }
- else
- {
- _logger.warn("No authentication specified. Using Default authentication manager");
- }
_default = ApplicationRegistry.getInstance().getAuthenticationManager();
return;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
index 1dcbb02d5c..23aaf56876 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
@@ -94,7 +94,7 @@ public class ConnectorConfiguration
public String certType;
@Configured(path = "connector.qpidnio",
- defaultValue = "true")
+ defaultValue = "false")
public boolean _multiThreadNIO;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 047cef9064..1e4b69c935 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -49,11 +49,11 @@ public class NonTransactionalContext implements TransactionalContext
/** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
- private Set<Long> _browsedAcks;
+ private final Set<Long> _browsedAcks;
private final MessageStore _messageStore;
- private StoreContext _storeContext;
+ private final StoreContext _storeContext;
/** Whether we are in a transaction */
private boolean _inTran;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
index b4c66aa24d..6016ecc1a5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
@@ -24,7 +24,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index e9fa642175..0acfa84f31 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -35,8 +35,8 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AllowAll;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -48,7 +48,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
private VirtualHostRegistry _virtualHostRegistry;
- private AccessManager _accessManager;
+ private ACLPlugin _accessManager;
private PrincipalDatabaseManager _databaseManager;
@@ -116,7 +116,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
return _virtualHostRegistry;
}
- public AccessManager getAccessManager()
+ public ACLPlugin getAccessManager()
{
return _accessManager;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 8d6a26fdbc..3ff9b8c356 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -1,260 +1,308 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-
-public class VirtualHost implements Accessable
-{
- private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
- private final String _name;
-
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private MessageStore _messageStore;
-
- protected VirtualHostMBean _virtualHostMBean;
-
- private AMQBrokerManagerMBean _brokerMBean;
-
- private AuthenticationManager _authenticationManager;
-
- private AccessManager _accessManager;
-
-
- public void setAccessableName(String name)
- {
- _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
- + name + ") ignored remains :" + getAccessableName());
- }
-
- public String getAccessableName()
- {
- return _name;
- }
-
-
- /**
- * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, "VirtualHost");
- }
-
- public String getObjectInstanceName()
- {
- return _name.toString();
- }
-
- public String getName()
- {
- return _name.toString();
- }
-
- public VirtualHost getVirtualHost()
- {
- return VirtualHost.this;
- }
-
-
- } // End of MBean class
-
- /**
- * Used for testing only
- * @param name
- * @param store
- * @throws Exception
- */
- public VirtualHost(String name, MessageStore store) throws Exception
- {
- this(name, null, store);
- }
-
- /**
- * Normal Constructor
- * @param name
- * @param hostConfig
- * @throws Exception
- */
- public VirtualHost(String name, Configuration hostConfig) throws Exception
- {
- this(name, hostConfig, null);
- }
-
- private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
- {
- _name = name;
-
- _virtualHostMBean = new VirtualHostMBean();
- // This isn't needed to be registered
- //_virtualHostMBean.register();
-
- _queueRegistry = new DefaultQueueRegistry(this);
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(hostConfig);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
-
- if (store != null)
- {
- _messageStore = store;
- }
- else
- {
- if (hostConfig == null)
- {
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
- }
- initialiseMessageStore(hostConfig);
- }
-
- _exchangeRegistry.initialise();
-
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
- _accessManager = new AccessManagerImpl(name, hostConfig);
-
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _brokerMBean.register();
- }
-
- private void initialiseMessageStore(Configuration config) throws Exception
- {
- String messageStoreClass = config.getString("store.class");
-
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", config);
- }
-
-
- public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
- {
- T instance;
- try
- {
- instance = instanceType.newInstance();
- }
- catch (Exception e)
- {
- _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
- }
- Configurator.configure(instance);
-
- return instance;
- }
-
-
- public String getName()
- {
- return _name;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- throw new UnsupportedOperationException();
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public AccessManager getAccessManager()
- {
- return _accessManager;
- }
-
- public void close() throws Exception
- {
- if (_messageStore != null)
- {
- _messageStore.close();
- }
- }
-
- public ManagedObject getBrokerMBean()
- {
- return _brokerMBean;
- }
-
- public ManagedObject getManagedObject()
- {
- return _virtualHostMBean;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.AMQException;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public class VirtualHost implements Accessable
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+ private AuthenticationManager _authenticationManager;
+
+ private ACLPlugin _accessManager;
+
+ private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true);
+
+ private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
+
+ public void setAccessableName(String name)
+ {
+ _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
+ + name + ") ignored remains :" + getAccessableName());
+ }
+
+ public String getAccessableName()
+ {
+ return _name;
+ }
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
+ * implementaion of an Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+ /**
+ * Used for testing only
+ * @param name
+ * @param store
+ * @throws Exception
+ */
+ public VirtualHost(String name, MessageStore store) throws Exception
+ {
+ this(name, new PropertiesConfiguration(), store);
+ }
+
+ /**
+ * Normal Constructor
+ * @param name
+ * @param hostConfig
+ * @throws Exception
+ */
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ this(name, hostConfig, null);
+ }
+
+ public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ // This isn't needed to be registered
+ //_virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeFactory.initialise(hostConfig);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+ if (store != null)
+ {
+ _messageStore = store;
+ }
+ else
+ {
+ if (hostConfig == null)
+ {
+ throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ }
+ initialiseMessageStore(hostConfig);
+ }
+
+ _exchangeRegistry.initialise();
+
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
+
+ _accessManager = ACLManager.loadACLManager(name, hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+ initialiseHouseKeeping(hostConfig);
+ }
+
+ private void initialiseHouseKeeping(final Configuration hostConfig)
+ {
+
+ long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
+
+ /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
+ if(period != 0L)
+ {
+ class RemoveExpiredMessagesTask extends TimerTask
+ {
+ public void run()
+ {
+ for(AMQQueue q : _queueRegistry.getQueues())
+ {
+
+ try
+ {
+ q.removeExpiredIfNoSubscribers();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
+ period/2,
+ period);
+ }
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public ACLPlugin getAccessManager()
+ {
+ return _accessManager;
+ }
+
+ public void close() throws Exception
+ {
+ if (_houseKeepingTimer != null)
+ {
+ _houseKeepingTimer.cancel();
+ }
+ if (_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java
new file mode 100644
index 0000000000..a0304a7b01
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java
@@ -0,0 +1,128 @@
+package org.apache.qpid.server;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class SelectorParserTest extends TestCase
+{
+ public void testSelectorWithHyphen()
+ {
+ testPass("Cost = 2 AND \"property-with-hyphen\" = 'wibble'");
+ }
+
+ public void testLike()
+ {
+ testFail("Cost LIKE 2");
+ testPass("Cost LIKE 'Hello'");
+ }
+
+ public void testStringQuoted()
+ {
+ testPass("string = 'Test'");
+ }
+
+ public void testProperty()
+ {
+ testPass("prop1 = prop2");
+ }
+
+ public void testPropertyNames()
+ {
+ testPass("$min= TRUE AND _max= FALSE AND Prop_2 = true AND prop$3 = false");
+ }
+
+ public void testProtected()
+ {
+ testFail("NULL = 0 ");
+ testFail("TRUE = 0 ");
+ testFail("FALSE = 0 ");
+ testFail("NOT = 0 ");
+ testFail("AND = 0 ");
+ testFail("OR = 0 ");
+ testFail("BETWEEN = 0 ");
+ testFail("LIKE = 0 ");
+ testFail("IN = 0 ");
+ testFail("IS = 0 ");
+ testFail("ESCAPE = 0 ");
+ }
+
+
+ public void testBoolean()
+ {
+ testPass("min= TRUE AND max= FALSE ");
+ testPass("min= true AND max= false");
+ }
+
+ public void testDouble()
+ {
+ testPass("positive=31E2 AND negative=-31.4E3");
+ testPass("min=" + Double.MIN_VALUE + " AND max=" + Double.MAX_VALUE);
+ }
+
+ public void testLong()
+ {
+ testPass("minLong=" + Long.MIN_VALUE + "L AND maxLong=" + Long.MAX_VALUE + "L");
+ }
+
+ public void testInt()
+ {
+ testPass("minInt=" + Integer.MIN_VALUE + " AND maxInt=" + Integer.MAX_VALUE);
+ }
+
+ public void testSigned()
+ {
+ testPass("negative=-42 AND positive=+42");
+ }
+
+ public void testOctal()
+ {
+ testPass("octal=042");
+ }
+
+
+ private void testPass(String selector)
+ {
+ try
+ {
+ new JMSSelectorFilter(selector);
+ }
+ catch (AMQException e)
+ {
+ fail("Selector '" + selector + "' was not parsed :" + e.getMessage());
+ }
+ }
+
+ private void testFail(String selector)
+ {
+ try
+ {
+ new JMSSelectorFilter(selector);
+ fail("Selector '" + selector + "' was parsed ");
+ }
+ catch (AMQException e)
+ {
+ //normal path
+ }
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index 8e5879a51e..7e2d56b460 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -589,6 +589,11 @@ public class DestWildExchangeTest extends TestCase
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
public boolean isImmediate()
{
return false;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 81b0ae2213..ed79384d42 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -173,7 +173,7 @@ public class AMQQueueAlertTest extends TestCase
public void testQueueDepthAlertWithSubscribers() throws Exception
{
protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
protocolSession.addChannel(channel);
// Create queue
@@ -242,6 +242,11 @@ public class AMQQueueAlertTest extends TestCase
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return immediate;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index d86c90bdae..c02b47e9fd 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -123,7 +123,7 @@ public class AMQQueueMBeanTest extends TestCase
TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
_queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
@@ -234,6 +234,11 @@ public class AMQQueueMBeanTest extends TestCase
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return immediate;