diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-21 13:55:26 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-21 13:55:26 +0000 |
commit | 1d69ea16b30dd67ac32683e6dc512f4c58ef93f1 (patch) | |
tree | b7d326570570b53936c6512d58c465c76244e925 /java/broker/src | |
parent | a53cbaad17df415e98f22cc42f2512467936bbc6 (diff) | |
parent | c0c3c38f032200e786cf5a4404cfa40a0c95f5e8 (diff) | |
download | qpid-python-1d69ea16b30dd67ac32683e6dc512f4c58ef93f1.tar.gz |
create branch for broker refactoring
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@650146 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
70 files changed, 2744 insertions, 1522 deletions
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj index adec1b348d..f6a843e080 100644 --- a/java/broker/src/main/grammar/SelectorParser.jj +++ b/java/broker/src/main/grammar/SelectorParser.jj @@ -172,6 +172,7 @@ TOKEN [IGNORE_CASE] : TOKEN [IGNORE_CASE] :
{
< ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* >
+ | < QUOTED_ID : "\"" ( ("\"\"") | ~["\""] )* "\"" >
}
// ----------------------------------------------------------------------------
@@ -589,6 +590,7 @@ String stringLitteral() : PropertyExpression variable() :
{
Token t;
+ StringBuffer rc = new StringBuffer();
PropertyExpression left=null;
}
{
@@ -597,6 +599,21 @@ PropertyExpression variable() : {
left = new PropertyExpression(t.image);
}
+ |
+ t = <QUOTED_ID>
+ {
+ // Decode the sting value.
+ String image = t.image;
+ for( int i=1; i < image.length()-1; i++ ) {
+ char c = image.charAt(i);
+ if( c == '"' )
+ i++;
+ rc.append(c);
+ }
+ return new PropertyExpression(rc.toString());
+ }
+
+
)
{
return left;
diff --git a/java/broker/src/main/java/log4j.properties b/java/broker/src/main/java/log4j.properties index 87f04f4991..6788c65463 100644 --- a/java/broker/src/main/java/log4j.properties +++ b/java/broker/src/main/java/log4j.properties @@ -19,6 +19,6 @@ log4j.rootCategory=${amqj.logging.level}, console log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=info +log4j.appender.console.Threshold=all log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 4696ec4453..17b4fa5d65 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -33,8 +33,9 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.exchange.MessageRouter; +import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.exchange.NoRouteException; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.store.MessageStore; @@ -42,17 +43,15 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.configuration.Configurator; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; public class AMQChannel { @@ -74,7 +73,7 @@ public class AMQChannel * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the <b>last</b> tag sent out */ - private AtomicLong _deliveryTag = new AtomicLong(0); + private long _deliveryTag = 0; /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ private AMQQueue _defaultQueue; @@ -90,7 +89,7 @@ public class AMQChannel private AMQMessage _currentMessage; /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>(); + private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private final MessageStore _messageStore; @@ -98,8 +97,6 @@ public class AMQChannel private final AtomicBoolean _suspended = new AtomicBoolean(false); - private final MessageRouter _exchanges; - private TransactionalContext _txnContext, _nonTransactedContext; /** @@ -119,11 +116,11 @@ public class AMQChannel private boolean _closing; @Configured(path = "advanced.enableJMSXUserID", - defaultValue = "true") + defaultValue = "false") public boolean ENABLE_JMSXUserID; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { //Set values from configuration @@ -135,7 +132,7 @@ public class AMQChannel _prefetch_HighWaterMark = DEFAULT_PREFETCH; _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; - _exchanges = exchanges; + // by default the session is non-transactional _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } @@ -199,11 +196,12 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } - public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException + public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException { _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); _currentMessage.setPublisher(publisher); + _currentMessage.setExchange(e); } public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) @@ -215,9 +213,9 @@ public class AMQChannel } else { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Content header received on channel " + _channelId); + _log.debug(debugIdentity() + "Content header received on channel " + _channelId); } if (ENABLE_JMSXUserID) @@ -252,9 +250,9 @@ public class AMQChannel throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Content body received on channel " + _channelId); + _log.debug(debugIdentity() + "Content body received on channel " + _channelId); } try @@ -285,7 +283,7 @@ public class AMQChannel { try { - _exchanges.routeContent(_currentMessage); + _currentMessage.route(); } catch (NoRouteException e) { @@ -295,7 +293,7 @@ public class AMQChannel public long getNextDeliveryTag() { - return _deliveryTag.incrementAndGet(); + return ++_deliveryTag; } public int getNextConsumerTag() @@ -333,13 +331,32 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); + // We add before we register as the Async Delivery process may AutoClose the subscriber + // so calling _cT2QM.remove before we have done put which was after the register succeeded. + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. _consumerTag2QueueMap.put(tag, queue); + try + { + queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); + } + catch (AMQException e) + { + _consumerTag2QueueMap.remove(tag); + throw e; + } + return tag; } - public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException + /** + * Unsubscribe a consumer from a queue. + * @param session + * @param consumerTag + * @return true if the consumerTag had a mapped queue that could be unregistered. + * @throws AMQException + */ + public boolean unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException { if (_log.isDebugEnabled()) { @@ -364,7 +381,9 @@ public class AMQChannel if (q != null) { q.unregisterProtocolSession(session, _channelId, consumerTag); + return true; } + return false; } /** @@ -822,7 +841,7 @@ public class AMQChannel { message.discard(_storeContext); message.setQueueDeleted(true); - + } catch (AMQException e) { @@ -967,16 +986,19 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) throws AMQException { - for (RequiredDeliveryException bouncedMessage : _returnMessages) + if (!_returnMessages.isEmpty()) { - AMQMessage message = bouncedMessage.getAMQMessage(); - session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + for (RequiredDeliveryException bouncedMessage : _returnMessages) + { + AMQMessage message = bouncedMessage.getAMQMessage(); + session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); - message.decrementReference(_storeContext); - } + message.decrementReference(_storeContext); + } - _returnMessages.clear(); + _returnMessages.clear(); + } } public boolean wouldSuspend(AMQMessage msg) diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index ab9f40b31d..d8a8cfb6d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -35,6 +35,7 @@ import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.FixedSizeByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQException; @@ -275,7 +276,7 @@ public class Main // once more testing of the performance of the simple allocator has been done if (!connectorConfig.enablePooledAllocator) { - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } int port = connectorConfig.port; diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index ac29998c2a..c62a7880a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -100,10 +100,9 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (UnacknowledgedMessage msg : _unacked) { - //msg.restoreTransientMessageData(); - //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); + } } @@ -115,7 +114,6 @@ public class TxAck implements TxnOp //in memory (persistent changes will be rolled back by store) for (UnacknowledgedMessage msg : _unacked) { - msg.clearTransientMessageData(); msg.getMessage().takeReference(); } } @@ -124,11 +122,6 @@ public class TxAck implements TxnOp { //remove the unacked messages from the channels map _map.remove(_unacked); - for (UnacknowledgedMessage msg : _unacked) - { - msg.clearTransientMessageData(); - } - } public void rollback(StoreContext storeContext) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 40f5970cac..df7cecc940 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -68,16 +68,6 @@ public class UnacknowledgedMessage entry.getMessage().decrementReference(storeContext); } - public void restoreTransientMessageData() throws AMQException - { - entry.getMessage().restoreTransientMessageData(); - } - - public void clearTransientMessageData() - { - entry.getMessage().clearTransientMessageData(); - } - public AMQMessage getMessage() { return entry.getMessage(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index b6b6ee39ce..12347c0278 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -212,10 +212,9 @@ public class DestNameExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - for (AMQQueue q : queues) - { - payload.enqueue(q); - } + payload.enqueue(queues); + + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 75be86a387..6fa3686152 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortStringTokenizer; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; @@ -40,11 +41,7 @@ import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -84,12 +81,21 @@ public class DestWildExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); - // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); - private static final String TOPIC_SEPARATOR = "."; - private static final String AMQP_STAR = "*"; - private static final String AMQP_HASH = "#"; + + private static final byte TOPIC_SEPARATOR = (byte)'.'; + private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString("."); + private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*"); + private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#"); + private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized = + new ConcurrentHashMap<AMQShortString, AMQShortString[]>(); + private static final byte HASH_BYTE = (byte)'#'; + private static final byte STAR_BYTE = (byte)'*'; /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") @@ -107,7 +113,7 @@ public class DestWildExchange extends AbstractExchange public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); - for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _routingKey2queues.entrySet()) + for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet()) { AMQShortString key = entry.getKey(); List<String> queueList = new ArrayList<String>(); @@ -156,27 +162,75 @@ public class DestWildExchange extends AbstractExchange assert queue != null; assert rKey != null; - AMQShortString routingKey = normalize(rKey); + _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey); - _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition - List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); + List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>()); + + + + + + + // if we got null back, no previous value was associated with the specified routing key hence // we need to read back the new value just put into the map if (queueList == null) { - queueList = _routingKey2queues.get(routingKey); + queueList = _bindingKey2queues.get(rKey); } + + if (!queueList.contains(queue)) { queueList.add(queue); + + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + AMQShortString routingKey = normalize(rKey); + List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); + + if(queueList2 == null) + { + queueList2 = _wildCardBindingKey2queues.get(routingKey); + AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR); + + ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens()); + + while (keyTok.hasMoreTokens()) + { + keyTokList.add(keyTok.nextToken()); + } + + _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()])); + } + queueList2.add(queue); + + } + else + { + List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>()); + if(queueList2 == null) + { + queueList2 = _simpleBindingKey2queues.get(rKey); + } + queueList2.add(queue); + + } + + + + } else if (_logger.isDebugEnabled()) { - _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); + _logger.debug("Queue " + queue + " is already registered with routing key " + rKey); } + + } private AMQShortString normalize(AMQShortString routingKey) @@ -186,60 +240,55 @@ public class DestWildExchange extends AbstractExchange routingKey = AMQShortString.EMPTY_STRING; } - StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); - List<String> _subscription = new ArrayList<String>(); + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>(); while (routingTokens.hasMoreTokens()) { - _subscription.add(routingTokens.nextToken()); + subscriptionList.add(routingTokens.nextToken()); } - int size = _subscription.size(); + int size = subscriptionList.size(); for (int index = 0; index < size; index++) { // if there are more levels if ((index + 1) < size) { - if (_subscription.get(index).equals(AMQP_HASH)) + if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN)) { - if (_subscription.get(index + 1).equals(AMQP_HASH)) + if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN)) { // we don't need #.# delete this one - _subscription.remove(index); + subscriptionList.remove(index); size--; // redo this normalisation index--; } - if (_subscription.get(index + 1).equals(AMQP_STAR)) + if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN)) { // we don't want #.* swap to *.# // remove it and put it in at index + 1 - _subscription.add(index + 1, _subscription.remove(index)); + subscriptionList.add(index + 1, subscriptionList.remove(index)); } } } // if we have more levels } - StringBuilder sb = new StringBuilder(); - for (String s : _subscription) - { - sb.append(s); - sb.append(TOPIC_SEPARATOR); - } - sb.deleteCharAt(sb.length() - 1); + AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING); - return new AMQShortString(sb.toString()); + return normalizedString; } public void route(AMQMessage payload) throws AMQException { MessagePublishInfo info = payload.getMessagePublishInfo(); - final AMQShortString routingKey = normalize(info.getRoutingKey()); + final AMQShortString routingKey = info.getRoutingKey(); List<AMQQueue> queues = getMatchedQueues(routingKey); // if we have no registered queues we have nothing to do @@ -254,19 +303,14 @@ public class DestWildExchange extends AbstractExchange else { _logger.warn("No queues found for routing key " + routingKey); - _logger.warn("Routing map contains: " + _routingKey2queues); + _logger.warn("Routing map contains: " + _bindingKey2queues); return; } } - for (AMQQueue q : queues) - { - // TODO: modify code generator to add clone() method then clone the deliver body - // without this addition we have a race condition - we will be modifying the body - // before the encoder has encoded the body for delivery - payload.enqueue(q); - } + payload.enqueue(queues); + } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) @@ -276,21 +320,21 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); + List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey)); return (queues != null) && queues.contains(queue); } public boolean isBound(AMQShortString routingKey) { - List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); + List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey)); return (queues != null) && !queues.isEmpty(); } public boolean isBound(AMQQueue queue) { - for (List<AMQQueue> queues : _routingKey2queues.values()) + for (List<AMQQueue> queues : _bindingKey2queues.values()) { if (queues.contains(queue)) { @@ -303,7 +347,7 @@ public class DestWildExchange extends AbstractExchange public boolean hasBindings() { - return !_routingKey2queues.isEmpty(); + return !_bindingKey2queues.isEmpty(); } public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException @@ -311,13 +355,11 @@ public class DestWildExchange extends AbstractExchange assert queue != null; assert rKey != null; - AMQShortString routingKey = normalize(rKey); - - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + List<AMQQueue> queues = _bindingKey2queues.get(rKey); if (queues == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey + ". No queue was registered with that _routing key"); + + " with routing key " + rKey + ". No queue was registered with that _routing key"); } @@ -325,12 +367,39 @@ public class DestWildExchange extends AbstractExchange if (!removedQ) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey); + + " with routing key " + rKey); } + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + AMQShortString bindingKey = normalize(rKey); + List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey); + queues2.remove(queue); + if(queues2.isEmpty()) + { + _wildCardBindingKey2queues.remove(bindingKey); + _bindingKey2Tokenized.remove(bindingKey); + } + + } + else + { + List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey); + queues2.remove(queue); + if(queues2.isEmpty()) + { + _simpleBindingKey2queues.remove(rKey); + } + + } + + + + if (queues.isEmpty()) { - _routingKey2queues.remove(routingKey); + _bindingKey2queues.remove(rKey); } } @@ -349,117 +418,162 @@ public class DestWildExchange extends AbstractExchange public Map<AMQShortString, List<AMQQueue>> getBindings() { - return _routingKey2queues; + return _bindingKey2queues; } private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) { - List<AMQQueue> list = new LinkedList<AMQQueue>(); - StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); - ArrayList<String> routingkeyList = new ArrayList<String>(); + List<AMQQueue> list = null; - while (routingTokens.hasMoreTokens()) + if(!_wildCardBindingKey2queues.isEmpty()) { - String next = routingTokens.nextToken(); - if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH)) - { - continue; - } - routingkeyList.add(next); - } - for (AMQShortString queue : _routingKey2queues.keySet()) - { - StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR); + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + final int routingTokensCount = routingTokens.countTokens(); + - ArrayList<String> queueList = new ArrayList<String>(); + AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount]; - while (queTok.hasMoreTokens()) + if(routingTokensCount == 1) { - queueList.add(queTok.nextToken()); + routingkeyTokens[0] =routingKey; } + else + { - int depth = 0; - boolean matching = true; - boolean done = false; - int routingskip = 0; - int queueskip = 0; - while (matching && !done) - { - if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip))) + int token = 0; + while (routingTokens.hasMoreTokens()) { - done = true; - // if it was the routing key that ran out of digits - if (routingkeyList.size() == (depth + routingskip)) - { - if (queueList.size() > (depth + queueskip)) - { // a hash and it is the last entry - matching = - queueList.get(depth + queueskip).equals(AMQP_HASH) - && (queueList.size() == (depth + queueskip + 1)); - } - } - else if (routingkeyList.size() > (depth + routingskip)) - { - // There is still more routing key to check - matching = false; - } + AMQShortString next = routingTokens.nextToken(); - continue; + routingkeyTokens[token++] = next; } + } + for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet()) + { + + AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey); + + + boolean matching = true; + boolean done = false; - // if the values on the two topics don't match - if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip))) + int depthPlusRoutingSkip = 0; + int depthPlusQueueSkip = 0; + + final int bindingKeyTokensCount = bindingKeyTokens.length; + + while (matching && !done) { - if (queueList.get(depth + queueskip).equals(AMQP_STAR)) + + if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip)) { - depth++; + done = true; + + // if it was the routing key that ran out of digits + if (routingTokensCount == depthPlusRoutingSkip) + { + if (bindingKeyTokensCount > depthPlusQueueSkip) + { // a hash and it is the last entry + matching = + bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN) + && (bindingKeyTokensCount == (depthPlusQueueSkip + 1)); + } + } + else if (routingTokensCount > depthPlusRoutingSkip) + { + // There is still more routing key to check + matching = false; + } continue; } - else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) + + // if the values on the two topics don't match + if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip])) { - // Is this a # at the end - if (queueList.size() == (depth + queueskip + 1)) + if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN)) { - done = true; + depthPlusQueueSkip++; + depthPlusRoutingSkip++; continue; } - - // otherwise # in the middle - while (routingkeyList.size() > (depth + routingskip)) + else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)) { - if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) + // Is this a # at the end + if (bindingKeyTokensCount == (depthPlusQueueSkip + 1)) + { + done = true; + + continue; + } + + // otherwise # in the middle + while (routingTokensCount > depthPlusRoutingSkip) { - queueskip++; - depth++; + if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1])) + { + depthPlusQueueSkip += 2; + depthPlusRoutingSkip++; + + break; + } - break; + depthPlusRoutingSkip++; } - routingskip++; + continue; } - continue; + matching = false; } - matching = false; + depthPlusQueueSkip++; + depthPlusRoutingSkip++; } - depth++; + if (matching) + { + if(list == null) + { + list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey)); + } + else + { + list.addAll(_wildCardBindingKey2queues.get(bindingKey)); + } + } } - if (matching) + } + if(!_simpleBindingKey2queues.isEmpty()) + { + List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey); + if(list == null) + { + if(queues == null) + { + list = Collections.EMPTY_LIST; + } + else + { + list = new ArrayList<AMQQueue>(queues); + } + } + else if(queues != null) { - list.addAll(_routingKey2queues.get(queue)); + list.addAll(queues); } + } return list; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 57ae2bb6d4..e7c887f306 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -42,6 +42,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -205,10 +206,8 @@ public class FanoutExchange extends AbstractExchange _logger.debug("Publishing message to queue " + _queues);
}
- for (AMQQueue q : _queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(new ArrayList(_queues));
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 2061803d65..32f58ed666 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -56,7 +56,7 @@ public class JMSSelectorFilter implements MessageFilter catch (AMQException e) { //fixme this needs to be sorted.. it shouldn't happen - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } return false; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index bb48539f50..7cd4afdb77 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,18 +24,18 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody> { - private static final Logger _log = Logger.getLogger(BasicConsumeMethodHandler.class); + private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class); private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler(); @@ -65,21 +65,21 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } else { - if (_log.isDebugEnabled()) + if (_logger.isDebugEnabled()) { - _log.debug("BasicConsume: from '" + body.getQueue() + - "' for:" + body.getConsumerTag() + - " nowait:" + body.getNowait() + - " args:" + body.getArguments()); + _logger.debug("BasicConsume: from '" + body.getQueue() + + "' for:" + body.getConsumerTag() + + " nowait:" + body.getNowait() + + " args:" + body.getArguments()); } AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern()); if (queue == null) { - if (_log.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _log.trace("No queue for '" + body.getQueue() + "'"); + _logger.debug("No queue for '" + body.getQueue() + "'"); } if (body.getQueue() != null) { @@ -97,6 +97,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic final AMQShortString consumerTagName; + //Perform ACLs + vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue); + if (body.getConsumerTag() != null) { consumerTagName = body.getConsumerTag().intern(); @@ -123,7 +126,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } catch (org.apache.qpid.AMQInvalidArgumentException ise) { - _log.debug("Closing connection due to invalid selector"); + _logger.debug("Closing connection due to invalid selector"); MethodRegistry methodRegistry = session.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(), diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 8d69697350..f8f9127809 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -27,10 +27,10 @@ import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -82,7 +82,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB }
else
{
- if(!queue.performGet(session, channel, !body.getNoAck()))
+
+ //Perform ACLs
+ vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
+
+ if (!queue.performGet(session, channel, !body.getNoAck()))
{
MethodRegistry methodRegistry = session.getMethodRegistry();
// TODO - set clusterId
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 66afc61751..0f99a21ee5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -25,20 +25,19 @@ import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; -public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody> +public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody> { - private static final Logger _log = Logger.getLogger(BasicPublishMethodHandler.class); + private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class); private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler(); @@ -55,12 +54,9 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - - - if (_log.isDebugEnabled()) + if (_logger.isDebugEnabled()) { - _log.debug("Publish received on channel " + channelId); + _logger.debug("Publish received on channel " + channelId); } AMQShortString exchange = body.getExchange(); @@ -90,8 +86,12 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi throw body.getChannelNotFoundException(channelId); } + //Access Control + vHost.getAccessManager().authorise(session, Permission.PUBLISH, body, e); + MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body); - channel.setPublishFrame(info, session); + info.setExchange(exchange); + channel.setPublishFrame(info, session, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index c48dd902eb..069cc6ea2c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -100,9 +100,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR } - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 3f604480b9..054674aed4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -55,8 +54,8 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(), - virtualHost.getExchangeRegistry()); + final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore() + ); session.addChannel(channel); ChannelOpenOkBody response; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 4f62e0b930..f99e650979 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -23,14 +23,12 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.security.access.AccessResult; -import org.apache.qpid.server.security.access.AccessRights; import org.apache.log4j.Logger; public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody> @@ -79,22 +77,8 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con { session.setVirtualHost(virtualHost); - AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID(), AccessRights.Rights.ANY); - - switch (result.getStatus()) - { - default: - case REFUSED: - String error = "Any access denied to vHost '" + virtualHostName + "' by " - + result.getAuthorizer(); - - _logger.warn(error); - - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error); - case GRANTED: - _logger.info("Granted any access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID() - + " by '" + result.getAuthorizer() + "'"); - } + //Perform ACL + virtualHost.getAccessManager().authorise(session, Permission.ACCESS ,body, virtualHost); // See Spec (0.8.2). Section 3.1.2 Virtual Hosts if (session.getContextKey() == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index c4a57f0286..9a98bc9659 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -26,11 +26,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -58,7 +58,12 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); - + + if (!body.getPassive()) + { + //Perform ACL if request is not passive + virtualHost.getAccessManager().authorise(session, Permission.CREATE, body); + } if (_logger.isDebugEnabled()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 8f36e6c767..888ffcb2e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -21,13 +21,12 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -51,6 +50,9 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + //Perform ACLs + virtualHost.getAccessManager().authorise(session, Permission.DELETE,body, + exchangeRegistry.getExchange(body.getExchange())); try { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index a365cd864a..0f6dc7a19d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -25,13 +25,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -106,6 +106,10 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> try { + + //Perform ACLs + virtualHost.getAccessManager().authorise(session, Permission.BIND, body, exch, queue, routingKey); + if (!exch.isBound(routingKey, body.getArguments(), queue)) { queue.bind(routingKey, body.getArguments(), exch); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 067c6ac285..7df864f189 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,17 +20,15 @@ */ package org.apache.qpid.server.handler; -import java.text.MessageFormat; import java.util.concurrent.atomic.AtomicInteger; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; -import org.apache.qpid.exchange.ExchangeDefaults; + import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; @@ -38,6 +36,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; @@ -47,7 +46,7 @@ import org.apache.commons.configuration.Configuration; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { - private static final Logger _log = Logger.getLogger(QueueDeclareHandler.class); + private static final Logger _logger = Logger.getLogger(QueueDeclareHandler.class); private static final QueueDeclareHandler _instance = new QueueDeclareHandler(); @@ -56,7 +55,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return _instance; } - @Configured(path = "queue.auto_register", defaultValue = "false") + @Configured(path = "queue.auto_register", defaultValue = "true") public boolean autoRegister; private final AtomicInteger _counter = new AtomicInteger(); @@ -76,6 +75,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar MessageStore store = virtualHost.getMessageStore(); + if (!body.getPassive()) + { + //Perform ACL if request is not passive + virtualHost.getAccessManager().authorise(session, Permission.CREATE, body); + } final AMQShortString queueName; @@ -109,7 +113,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else { - queue = createQueue(queueName,body, virtualHost, session); + queue = createQueue(queueName, body, virtualHost, session); if (queue.isDurable() && !queue.isAutoDelete()) { store.createQueue(queue); @@ -120,7 +124,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); queue.bind(queueName, null, defaultExchange); - _log.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")"); + _logger.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")"); } } } @@ -152,7 +156,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queue.getConsumerCount()); session.writeFrame(responseBody.generateFrame(channelId)); - _log.info("Queue " + queueName + " declared successfully"); + _logger.info("Queue " + queueName + " declared successfully"); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 09b7de3a5c..310a73ffeb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.security.access.Permission; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -103,6 +104,10 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB } else { + + //Perform ACLs + virtualHost.getAccessManager().authorise(session, Permission.DELETE, body, queue); + int purged = queue.delete(body.getIfUnused(), body.getIfEmpty()); if (queue.isDurable()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 5bdca93bc6..cce49f13c7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.security.access.Permission;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -100,6 +101,10 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod }
else
{
+
+ //Perform ACLs
+ virtualHost.getAccessManager().authorise(session, Permission.PURGE, body, queue);
+
long purged = queue.clearQueue(channel.getStoreContext());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java index b056fa6797..e758e315aa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java @@ -1,110 +1,113 @@ -package org.apache.qpid.server.handler;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.protocol.AMQConstant;
-
-public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
-{
- private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
-
- private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
-
- public static QueueUnbindHandler getInstance()
- {
- return _instance;
- }
-
- private QueueUnbindHandler()
- {
- }
-
- public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
- {
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
- ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
-
- final AMQQueue queue;
- final AMQShortString routingKey;
-
- if (body.getQueue() == null)
- {
- AMQChannel channel = session.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
-
- queue = channel.getDefaultQueue();
-
- if (queue == null)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
- }
-
- routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
-
- }
- else
- {
- queue = queueRegistry.getQueue(body.getQueue());
- routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
- }
-
- if (queue == null)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
- }
- final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
- if (exch == null)
- {
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
- }
-
-
- try
- {
- queue.unBind(routingKey, body.getArguments(), exch);
- }
- catch (AMQInvalidRoutingKeyException rke)
- {
- throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
- }
- catch (AMQException e)
- {
- if(e.getErrorCode() == AMQConstant.NOT_FOUND)
- {
- throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
- }
- throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
- }
-
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
-
-
- }
-}
+package org.apache.qpid.server.handler; + +import org.apache.log4j.Logger; + +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.protocol.AMQConstant; + +public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody> +{ + private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class); + + private static final QueueUnbindHandler _instance = new QueueUnbindHandler(); + + public static QueueUnbindHandler getInstance() + { + return _instance; + } + + private QueueUnbindHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + + final AMQQueue queue; + final AMQShortString routingKey; + + if (body.getQueue() == null) + { + AMQChannel channel = session.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + + queue = channel.getDefaultQueue(); + + if (queue == null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); + } + + routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(); + + } + else + { + queue = queueRegistry.getQueue(body.getQueue()); + routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(); + } + + if (queue == null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + } + final Exchange exch = exchangeRegistry.getExchange(body.getExchange()); + if (exch == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); + } + + //Perform ACLs + virtualHost.getAccessManager().authorise(session, Permission.UNBIND, body, queue); + + try + { + queue.unBind(routingKey, body.getArguments(), exch); + } + catch (AMQInvalidRoutingKeyException rke) + { + throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString()); + } + catch (AMQException e) + { + if(e.getErrorCode() == AMQConstant.NOT_FOUND) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e); + } + throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); + } + + if (_log.isInfoEnabled()) + { + _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); + } + + MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody(); + session.writeFrame(responseBody.generateFrame(channelId)); + + + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java index 4fb260472d..a0ecc2bd85 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management; -import org.apache.qpid.server.security.access.UserManagement; +import org.apache.qpid.server.security.access.management.UserManagement; import org.apache.log4j.Logger; import javax.management.remote.MBeanServerForwarder; diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 1bfe1e3d35..d7a879180a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -72,7 +72,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -100,8 +100,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -127,7 +127,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final StoreContext storeContext = message.getStoreContext();
final long messageId = message.getMessageId();
- ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -151,8 +151,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -171,7 +171,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
- private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -187,10 +187,10 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame.toByteBuffer();
+ return deliverFrame;
}
- private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -205,7 +205,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame.toByteBuffer();
+ return getOkFrame;
}
public byte getProtocolMinorVersion()
@@ -218,7 +218,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return getProtocolSession().getProtocolMajorVersion();
}
- private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicReturnBody basicReturnBody =
@@ -228,13 +228,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame.toByteBuffer();
+ return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -247,14 +247,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 0bc2fcf6f7..646ef43826 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -12,10 +12,14 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -46,9 +50,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
@@ -58,8 +62,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if(bodyCount == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
writeFrame(compositeBlock);
}
@@ -73,9 +77,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+
+ CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
//
@@ -84,7 +88,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext,messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -93,6 +97,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
@@ -101,11 +113,10 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final StoreContext storeContext = message.getStoreContext();
final long messageId = message.getMessageId();
- ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
if(bodyCount == 0)
@@ -124,9 +135,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -135,7 +146,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext, messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -145,41 +156,84 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
- private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- BasicDeliverBody deliverBody =
- methodRegistry.createBasicDeliverBody(consumerTag,
- deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+
+ final boolean isRedelivered = messageHandle.isRedelivered();
+ final AMQShortString exchangeName = pb.getExchange();
+ final AMQShortString routingKey = pb.getRoutingKey();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
- return deliverFrame.toByteBuffer();
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
}
- private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
BasicGetOkBody getOkBody =
- methodRegistry.createBasicGetOkBody(deliveryTag,
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
messageHandle.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame.toByteBuffer();
+ return getOkFrame;
}
public byte getProtocolMinorVersion()
@@ -192,26 +246,25 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return getProtocolSession().getProtocolMajorVersion();
}
- private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
BasicReturnBody basicReturnBody =
- methodRegistry.createBasicReturnBody(replyCode,
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame.toByteBuffer();
+ return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
//
@@ -221,14 +274,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
@@ -252,9 +304,69 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index be22a90d0b..9191ecf6ed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.plugins; import java.io.File; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,7 +29,6 @@ import org.apache.felix.framework.Felix; import org.apache.felix.framework.cache.BundleCache; import org.apache.felix.framework.util.FelixConstants; import org.apache.felix.framework.util.StringMap; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeType; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleException; @@ -72,7 +70,7 @@ public class PluginManager "org.apache.qpid.server.queue; version=0.2.1," + "javax.management.openmbean; version=1.0.0,"+ "javax.management; version=1.0.0,"+ - "uk.co.thebadgerset.junit.extensions.util; version=0.6.1," + "org.apache.qpid.junit.extensions.util; version=0.6.1," ); if (plugindir == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 0fe6d3636e..4267642b14 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -109,7 +109,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private FieldTable _clientProperties; private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); - private List<Integer> _closingChannelsList = new ArrayList<Integer>(); + private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>(); private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; private MethodDispatcher _dispatcher; @@ -138,11 +138,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable catch (RuntimeException e) { e.printStackTrace(); - // throw e; + throw e; } - - // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, @@ -209,26 +207,39 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _logger.debug("Frame Received: " + frame); } - if (body instanceof AMQMethodBody) - { - methodFrameReceived(channelId, (AMQMethodBody) body); - } - else if (body instanceof ContentHeaderBody) - { - contentHeaderReceived(channelId, (ContentHeaderBody) body); - } - else if (body instanceof ContentBody) + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) { - contentBodyReceived(channelId, (ContentBody) body); + if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); + } + + return; + } } - else if (body instanceof HeartbeatBody) + + + + try { - // NO OP + body.handle(channelId, this); } - else + catch (AMQException e) { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); + closeChannel(channelId); + throw e; } + } private void protocolInitiationReceived(ProtocolInitiation pi) @@ -271,32 +282,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - private void methodFrameReceived(int channelId, AMQMethodBody methodBody) + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) { final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((evt.getMethod() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); - } - - return; - } - } - try { try @@ -358,6 +348,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _logger.info("Closing connection due to: " + e.getMessage()); } + markChannelAwaitingCloseOk(channelId); closeSession(); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(channelId)); @@ -365,17 +356,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } catch (Exception e) { - _stateManager.error(e); + for (AMQMethodListener listener : _frameListeners) { listener.error(e); } + _logger.error("Unexpected exception while processing frame. Closing connection.", e); + _minaProtocolSession.close(); } } - private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); @@ -384,13 +377,18 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } - private void contentBodyReceived(int channelId, ContentBody body) throws AMQException + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentBody(body, this); } + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) + { + // NO - OP + } + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -539,7 +537,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable try { channel.close(this); - markChannelawaitingCloseOk(channelId); + markChannelAwaitingCloseOk(channelId); } finally { @@ -550,11 +548,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void closeChannelOk(int channelId) { - removeChannel(channelId); + // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler. + // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory. + // We do it from the Close Handler as we are sending the OK back to the client. + // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException + // will send a close-ok.. Where we should call removeChannel. + // However, due to the poor exception handling on the client. The client-user will be notified of the + // InvalidArgument and if they then decide to close the session/connection then the there will be time + // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed. + //removeChannel(channelId); _closingChannelsList.remove(new Integer(channelId)); } - private void markChannelawaitingCloseOk(int channelId) + private void markChannelAwaitingCloseOk(int channelId) { _closingChannelsList.add(channelId); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 543e043bed..d8dbf97e49 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -29,9 +29,8 @@ import org.apache.mina.common.IoSession; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.filter.codec.QpidProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; @@ -57,6 +56,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter private final IApplicationRegistry _applicationRegistry; + private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144"; + private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144"; + + private final int BUFFER_READ_LIMIT_SIZE; + private final int BUFFER_WRITE_LIMIT_SIZE; public AMQPFastProtocolHandler(Integer applicationRegistryInstance) { @@ -66,6 +70,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry) { _applicationRegistry = applicationRegistry; + + // Read the configuration from the application registry + BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE)); + BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE)); + _logger.debug("AMQPFastProtocolHandler created"); } @@ -82,7 +91,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter createSession(protocolSession, _applicationRegistry, codecFactory); _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory); + final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). getConfiguredObject(ConnectorConfiguration.class); @@ -114,27 +123,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } - if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false)) + if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false)) { try { // //Add IO Protection Filters IoFilterChain chain = protocolSession.getFilterChain(); - int buf_size = 32768; - if (protocolSession.getConfig() instanceof SocketSessionConfig) - { - buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize(); - } protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE); writefilter.attach(chain); protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index d9a9d2273b..f501bc27d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.exchange.Exchange; import java.util.HashMap; import java.util.HashSet; @@ -82,12 +83,16 @@ public class AMQMessage private long _expiration; - private final int hashcode = System.identityHashCode(this); + + + private Exchange _exchange; + private static final boolean SYNCED_CLOCKS = + ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false); public String debugIdentity() { - return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; + return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; } public void setExpiration() @@ -97,7 +102,7 @@ public class AMQMessage long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); - if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false)) + if (SYNCED_CLOCKS) { _expiration = expiration; } @@ -126,6 +131,21 @@ public class AMQMessage return _referenceCount.get() > 0; } + public void setExchange(final Exchange exchange) + { + _exchange = exchange; + } + + public void route() throws AMQException + { + _exchange.route(this); + } + + public void enqueue(final List<AMQQueue> queues) + { + _transientMessageData.setDestinationQueues(queues); + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -637,8 +657,6 @@ public class AMQMessage // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - _transientMessageData = null; - for (AMQQueue q : destinationQueues) { // Increment the references to this message for each queue delivery. @@ -649,7 +667,7 @@ public class AMQMessage } finally { - destinationQueues.clear(); + // Remove refence for routing process . Reference count should now == delivered queue count decrementReference(storeContext); } @@ -687,10 +705,6 @@ public class AMQMessage _transientMessageData = transientMessageData; } - public void clearTransientMessageData() - { - _transientMessageData = null; - } public String toString() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 53c36d9718..7c6db0b4b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -36,7 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; -import java.util.List; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -154,10 +155,9 @@ public class AMQQueue implements Managable, Comparable /** total messages received by the queue since startup. */ public AtomicLong _totalMessagesReceived = new AtomicLong(); - public int compareTo(Object o) - { - return _name.compareTo(((AMQQueue) o).getName()); - } + + private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -200,6 +200,13 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + + // This ensure that the notification checks for the configured alerts are created. + setMaximumMessageAge(_maximumMessageAge); + setMaximumMessageCount(_maximumMessageCount); + setMaximumMessageSize(_maximumMessageSize); + setMaximumQueueDepth(_maximumQueueDepth); + } private AMQQueueMBean createMBean() throws AMQException @@ -214,7 +221,7 @@ public class AMQQueue implements Managable, Comparable } } - public AMQShortString getName() + public final AMQShortString getName() { return _name; } @@ -540,9 +547,17 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageSize; } - public void setMaximumMessageSize(long value) + public void setMaximumMessageSize(final long maximumMessageSize) { - _maximumMessageSize = value; + _maximumMessageSize = maximumMessageSize; + if(maximumMessageSize == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); + } } public int getConsumerCount() @@ -565,9 +580,20 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageCount; } - public void setMaximumMessageCount(long value) + public void setMaximumMessageCount(final long maximumMessageCount) { - _maximumMessageCount = value; + _maximumMessageCount = maximumMessageCount; + if(maximumMessageCount == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); + } + + + } public long getMaximumQueueDepth() @@ -576,9 +602,18 @@ public class AMQQueue implements Managable, Comparable } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(long value) + public void setMaximumQueueDepth(final long maximumQueueDepth) { - _maximumQueueDepth = value; + _maximumQueueDepth = maximumQueueDepth; + if(maximumQueueDepth == 0L) + { + _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); + } + } public long getOldestMessageArrivalTime() @@ -661,6 +696,12 @@ public class AMQQueue implements Managable, Comparable } _subscribers.addSubscriber(subscription); + if(exclusive) + { + _subscribers.setExclusive(true); + } + + subscription.start(); } private boolean isExclusive() @@ -692,6 +733,7 @@ public class AMQQueue implements Managable, Comparable ps, channel, consumerTag, this)); } + _subscribers.setExclusive(false); Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) @@ -805,7 +847,7 @@ public class AMQQueue implements Managable, Comparable public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException { AMQMessage msg = entry.getMessage(); - _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst); + _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -938,6 +980,14 @@ public class AMQQueue implements Managable, Comparable public void setMaximumMessageAge(long maximumMessageAge) { _maximumMessageAge = maximumMessageAge; + if(maximumMessageAge == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); + } } public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry) @@ -950,4 +1000,25 @@ public class AMQQueue implements Managable, Comparable return new QueueEntry(this, amqMessage); } + public int compareTo(Object o) + { + return _name.compareTo(((AMQQueue) o).getName()); + } + + + public void removeExpiredIfNoSubscribers() throws AMQException + { + synchronized(_subscribers.getChangeLock()) + { + if(_subscribers.isEmpty()) + { + _deliveryMgr.removeExpired(); + } + } + } + + public final Set<NotificationCheck> getNotificationChecks() + { + return _notificationChecks; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 9e32de3f76..348a136f9d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -54,10 +54,7 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.Iterator; -import java.util.List; +import java.util.*; /** * AMQQueueMBean is the management bean for an {@link AMQQueue}. @@ -97,6 +94,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; private Notification _lastNotification = null; + + + @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { @@ -249,16 +249,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public void checkForNotification(AMQMessage msg) throws AMQException, JMException { - final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); - for (NotificationCheck check : NotificationCheck.values()) + if(!notificationChecks.isEmpty()) { - if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + + for (NotificationCheck check : notificationChecks) { - if (check.notifyIfNecessary(msg, _queue, this)) + if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) { - _lastNotificationTimes[check.ordinal()] = currentTime; + if (check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 3cf2cb9b12..7dfcae95c3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -212,6 +212,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + /** + * NOTE : This method should only be called when there are no active subscribers + */ + public void removeExpired() throws AMQException + { + _lock.lock(); + + + for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) + { + QueueEntry entry = iter.next(); + if(entry.expired()) + { + // fixme: Currently we have to update the total byte size here for the data in the queue + _totalMessageSize.addAndGet(-entry.getSize()); + _queue.dequeue(_reapingStoreContext,entry); + iter.remove(); + } + } + + + _lock.unlock(); + } + /** @return the state of the async processor. */ public boolean isProcessingAsync() { @@ -278,9 +302,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void populatePreDeliveryQueue(Subscription subscription) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")"); + _log.debug("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")"); } Iterator<QueueEntry> currentQueue = _messages.iterator(); @@ -289,13 +313,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { QueueEntry entry = currentQueue.next(); - if (!entry.getDeliveredToConsumer()) + if (subscription.hasInterest(entry)) { - if (subscription.hasInterest(entry)) - { - subscription.enqueueForPreDelivery(entry, false); - } + subscription.enqueueForPreDelivery(entry, false); } + } } @@ -339,8 +361,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(), deliveryTag, _queue.getMessageCount()); - _totalMessageSize.addAndGet(-entry.getSize()); + } + _totalMessageSize.addAndGet(-entry.getSize()); if (!acks) { @@ -484,9 +507,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (purgeMessage(entry, sub, purgeOnly)) { AMQMessage message = entry.getMessage(); - // if we are purging then ensure we mark this message taken for the current subscriber - // the current subscriber may be null in the case of a get or a purge but this is ok. -// boolean alreadyTaken = message.taken(_queue, sub); //remove the already taken message or expired QueueEntry removed = messages.poll(); @@ -494,7 +514,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager assert removed == entry; // if the message expired then the _totalMessageSize needs adjusting - if (message.expired(_queue) && !entry.getDeliveredToConsumer()) + if (message.expired(_queue) && !entry.taken(sub)) { _totalMessageSize.addAndGet(-entry.getSize()); @@ -512,9 +532,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //else the clean up is not required as the message has already been taken for this queue therefore // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated. - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace("Removed taken message:" + message.debugIdentity()); + _log.debug("Removed taken message:" + message.debugIdentity()); } // try the next message @@ -610,9 +630,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages); - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + + _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + ") from queue (" + System.identityHashCode(messageQueue) + ") AMQQueue (" + System.identityHashCode(queue) + ")"); } @@ -638,9 +658,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // message will be null if we have no messages in the messageQueue. if (entry == null) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + _log.debug(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); } return; } @@ -680,9 +700,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (messageQueue == sub.getResendQueue()) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub); + _log.debug(debugIdentity() + "All messages sent from resendQueue for " + sub); } if (messageQueue.isEmpty()) { @@ -842,17 +862,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager for (Subscription sub : _subscriptions.getSubscriptions()) { - // stop if the message gets delivered whilst PreDelivering if we have a shared queue. - if (_queue.isShared() && entry.getDeliveredToConsumer()) - { - if (debugEnabled) - { - _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) + - ") is already delivered."); - } - continue; - } - // Only give the message to those that want them. if (sub.hasInterest(entry)) { @@ -894,9 +903,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!s.isSuspended()) { - if (_log.isTraceEnabled()) + if (debugEnabled) { - _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + + _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index f7f35a9319..1568f58e2e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -97,4 +97,6 @@ interface DeliveryManager long getOldestMessageArrival(); void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg); + + void removeExpired() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java index 60c1a8f574..e6377b33da 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java @@ -51,7 +51,7 @@ class ExchangeBindings ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments) { - _routingKey = routingKey; + _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey; _exchange = exchange; _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments; } @@ -74,8 +74,7 @@ class ExchangeBindings public int hashCode() { return (_exchange == null ? 0 : _exchange.hashCode()) - + (_routingKey == null ? 0 : _routingKey.hashCode()) - + (_arguments == null ? 0 : _arguments.hashCode()); + + (_routingKey == null ? 0 : _routingKey.hashCode()); } public boolean equals(Object o) @@ -86,8 +85,7 @@ class ExchangeBindings } ExchangeBinding eb = (ExchangeBinding) o; return _exchange.equals(eb._exchange) - && _routingKey.equals(eb._routingKey) - && _arguments.equals(eb._arguments); + && _routingKey.equals(eb._routingKey); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6b3d65661f..6f9efd3200 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -29,9 +29,9 @@ public enum NotificationCheck {
boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
- int msgCount = queue.getMessageCount();
+ int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
return true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index a706098b71..96ce6743ec 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -45,8 +45,6 @@ public interface Subscription void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst); - boolean isAutoClose(); - void close(); boolean isClosed(); @@ -60,4 +58,6 @@ public interface Subscription Object getSendLock(); AMQChannel getChannel(); + + void start(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index e631481cc8..bde3ad8ec9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -292,14 +292,17 @@ public class SubscriptionImpl implements Subscription queue.dequeue(storeContext, entry); } +/* + if (_sendLock.get()) + { + _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); + } +*/ + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); - if (_sendLock.get()) - { - _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); - } if (_acks) { @@ -308,10 +311,11 @@ public class SubscriptionImpl implements Subscription protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); - if (!_acks) - { - entry.getMessage().decrementReference(storeContext); - } + + } + if (!_acks) + { + entry.getMessage().decrementReference(storeContext); } } finally @@ -367,59 +371,60 @@ public class SubscriptionImpl implements Subscription // return false; } - final AMQProtocolSession publisher = entry.getMessage().getPublisher(); + //todo - client id should be recoreded and this test removed but handled below - if (_noLocal && publisher != null) + if (_noLocal) { - // We don't want local messages so check to see if message is one we sent - Object localInstance; - Object msgInstance; - if ((protocolSession.getClientProperties() != null) && - (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + final AMQProtocolSession publisher = entry.getMessage().getPublisher(); + if(publisher != null) + { + // We don't want local messages so check to see if message is one we sent + Object localInstance; + Object msgInstance; - if ((publisher.getClientProperties() != null) && - (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (localInstance == msgInstance || localInstance.equals(msgInstance)) + + if ((publisher.getClientProperties() != null) && + (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + -// msg.debugIdentity() + ")"); -// } - return false; + if (localInstance == msgInstance || localInstance.equals(msgInstance)) + { + // if (_logger.isTraceEnabled()) + // { + // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + // msg.debugIdentity() + ")"); + // } + return false; + } } } - } - else - { + else + { - localInstance = protocolSession.getClientIdentifier(); - //todo - client id should be recoreded and this test removed but handled here + localInstance = protocolSession.getClientIdentifier(); + //todo - client id should be recoreded and this test removed but handled here - msgInstance = publisher.getClientIdentifier(); - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + -// msg.debugIdentity() + ")"); -// } - return false; + msgInstance = publisher.getClientIdentifier(); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + { + // if (_logger.isTraceEnabled()) + // { + // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + // msg.debugIdentity() + ")"); + // } + return false; + } } - } - + } } - if (_logger.isTraceEnabled()) - { - _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); - } return checkFilters(entry); } @@ -433,23 +438,7 @@ public class SubscriptionImpl implements Subscription private boolean checkFilters(QueueEntry msg) { - if (_filters != null) - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has filters."); -// } - return _filters.allAllow(msg.getMessage()); - } - else - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no filters"); -// } - - return true; - } + return (_filters == null) || _filters.allAllow(msg.getMessage()); } public Queue<QueueEntry> getPreDeliveryQueue() @@ -472,7 +461,7 @@ public class SubscriptionImpl implements Subscription } } - public boolean isAutoClose() + private boolean isAutoClose() { return _autoClose; } @@ -534,19 +523,24 @@ public class SubscriptionImpl implements Subscription { _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this); - ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); - _sentClose = true; - - //fixme JIRA do this better + boolean unregisteredOK = false; try { - channel.unsubscribeConsumer(protocolSession, consumerTag); + unregisteredOK = channel.unsubscribeConsumer(protocolSession, consumerTag); } catch (AMQException e) { // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag. + _logger.info("Unable to UnsubscribeConsumer :" + consumerTag +" so not going to send CancelOK."); + } + + if (unregisteredOK) + { + ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); + _sentClose = true; } + } } @@ -563,9 +557,9 @@ public class SubscriptionImpl implements Subscription { QueueEntry resent = _resendQueue.poll(); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Removed for resending:" + resent.debugIdentity()); + _logger.debug("Removed for resending:" + resent.debugIdentity()); } resent.release(); @@ -613,7 +607,7 @@ public class SubscriptionImpl implements Subscription public boolean wouldSuspend(QueueEntry msg) { - return channel.wouldSuspend(msg.getMessage()); + return _acks && channel.wouldSuspend(msg.getMessage()); } public Queue<QueueEntry> getResendQueue() @@ -677,4 +671,19 @@ public class SubscriptionImpl implements Subscription return channel; } + public void start() + { + //Check to see if we need to autoclose + if (filtersMessages()) + { + if (isAutoClose()) + { + if (_messages.isEmpty()) + { + autoclose(); + } + } + } + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b73b8d7e07..882efd380d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -37,7 +37,9 @@ class SubscriptionSet implements WeightedSubscriptionManager /** Used to control the round robin delivery of content */ private int _currentSubscriber; - private final Object _subscriptionsChange = new Object(); + + private final Object _changeLock = new Object(); + private volatile boolean _exclusive; /** Accessor for unit tests. */ @@ -48,7 +50,7 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - synchronized (_subscriptionsChange) + synchronized (_changeLock) { _subscriptions.add(subscription); } @@ -66,7 +68,7 @@ class SubscriptionSet implements WeightedSubscriptionManager // TODO: possibly need O(1) operation here. Subscription sub = null; - synchronized (_subscriptionsChange) + synchronized (_changeLock) { int subIndex = _subscriptions.indexOf(subscription); @@ -115,10 +117,7 @@ class SubscriptionSet implements WeightedSubscriptionManager */ public Subscription nextSubscriber(QueueEntry msg) { - if (_subscriptions.isEmpty()) - { - return null; - } + try { @@ -142,30 +141,64 @@ class SubscriptionSet implements WeightedSubscriptionManager private Subscription nextSubscriberImpl(QueueEntry msg) { - final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) + if(_exclusive) { - Subscription subscription = iterator.next(); - ++_currentSubscriber; - subscriberScanned(); - - if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + try { - if (subscription.hasInterest(msg)) + Subscription subscription = _subscriptions.get(0); + subscriberScanned(); + + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + if (subscription.hasInterest(msg)) { - return subscription; + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } } } } + catch(IndexOutOfBoundsException e) + { + } + return null; } + else + { + if (_subscriptions.isEmpty()) + { + return null; + } + final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); + while (iterator.hasNext()) + { + Subscription subscription = iterator.next(); + ++_currentSubscriber; + subscriberScanned(); - return null; + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + { + if (subscription.hasInterest(msg)) + { + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } + } + } + } + + return null; + } } /** Overridden in test classes. */ @@ -226,4 +259,16 @@ class SubscriptionSet implements WeightedSubscriptionManager { return _subscriptions.size(); } + + + public Object getChangeLock() + { + return _changeLock; + } + + public void setExclusive(final boolean exclusive) + { + _exclusive = exclusive; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java index 79ee6b93a3..9b91c71a1d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; +import java.util.Collections; import org.apache.qpid.AMQException; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -60,7 +62,7 @@ public class TransientMessageData * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done * by the message handle. */ - private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>(); + private List<AMQQueue> _destinationQueues; public MessagePublishInfo getMessagePublishInfo() { @@ -74,7 +76,7 @@ public class TransientMessageData public List<AMQQueue> getDestinationQueues() { - return _destinationQueues; + return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues; } public void setDestinationQueues(List<AMQQueue> destinationQueues) @@ -109,6 +111,10 @@ public class TransientMessageData public void addDestinationQueue(AMQQueue queue) { + if(_destinationQueues == null) + { + _destinationQueues = new ArrayList<AMQQueue>(); + } _destinationQueues.add(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 42c32dcf00..fef958000a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,8 +39,8 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.access.AccessManager; -import org.apache.qpid.server.security.access.AccessManagerImpl; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.AMQException; @@ -52,15 +52,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry private AuthenticationManager _authenticationManager; - private AccessManager _accessManager; + private ACLPlugin _accessManager; private PrincipalDatabaseManager _databaseManager; private VirtualHostRegistry _virtualHostRegistry; - - private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>(); - private PluginManager _pluginManager; @@ -110,9 +107,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _virtualHostRegistry = new VirtualHostRegistry(); - _accessManager = new AccessManagerImpl("default", _configuration); + _accessManager = ACLManager.loadACLManager("default", _configuration); - _databaseManager = new ConfigurationFilePrincipalDatabaseManager(); + _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration); _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); @@ -121,7 +118,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _managedObjectRegistry.start(); _pluginManager = new PluginManager(_configuration.getString("plugin-directory")); - + initialiseVirtualHosts(); } @@ -154,7 +151,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry return _virtualHostRegistry; } - public AccessManager getAccessManager() + public ACLPlugin getAccessManager() { return _accessManager; } @@ -178,7 +175,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { return getConfiguration().getList("virtualhosts.virtualhost.name"); } - + public PluginManager getPluginManager() { return _pluginManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index 6aac21a161..ca10fbdba2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; -import org.apache.qpid.server.security.access.AccessManager; +import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public interface IApplicationRegistry @@ -68,8 +68,8 @@ public interface IApplicationRegistry VirtualHostRegistry getVirtualHostRegistry(); - AccessManager getAccessManager(); - + ACLPlugin getAccessManager(); + PluginManager getPluginManager(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java index 35d036d20f..539f32a732 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java @@ -23,33 +23,35 @@ package org.apache.qpid.server.security.access; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.security.access.plugins.DenyAll; import org.apache.qpid.configuration.PropertyUtils; import org.apache.log4j.Logger; import java.util.List; import java.lang.reflect.Method; -import java.security.Principal; -public class AccessManagerImpl implements AccessManager +public class ACLManager { - private static final Logger _logger = Logger.getLogger(AccessManagerImpl.class); + private static final Logger _logger = Logger.getLogger(ACLManager.class); - AccessManager _accessManager; - - public AccessManagerImpl(String name, Configuration hostConfig) throws ConfigurationException + public static ACLPlugin loadACLManager(String name, Configuration hostConfig) throws ConfigurationException { + ACLPlugin aclPlugin = ApplicationRegistry.getInstance().getAccessManager(); + if (hostConfig == null) { - _logger.warn("No Configuration specified. Using default access controls for VirtualHost:'" + name + "'"); - return; + _logger.warn("No Configuration specified. Using default ACLPlugin '" + aclPlugin.getPluginName() + + "' for VirtualHost:'" + name + "'"); + return aclPlugin; } String accessClass = hostConfig.getString("security.access.class"); if (accessClass == null) { - _logger.warn("No access control specified. Using default access controls for VirtualHost:'" + name + "'"); - return; + + _logger.warn("No ACL Plugin specified. Using default ACL Plugin '" + aclPlugin.getPluginName() + + "' for VirtualHost:'" + name + "'"); + return aclPlugin; } Object o; @@ -59,26 +61,35 @@ public class AccessManagerImpl implements AccessManager } catch (Exception e) { - throw new ConfigurationException("Error initialising access control: " + e, e); + throw new ConfigurationException("Error initialising ACL: " + e, e); } - if (!(o instanceof AccessManager)) + if (!(o instanceof ACLPlugin)) { - throw new ConfigurationException("Access control must implement the VirtualHostAccess interface"); + throw new ConfigurationException("ACL Plugins must implement the ACLPlugin interface"); } - initialiseAccessControl((AccessManager) o, hostConfig); + initialiseAccessControl((ACLPlugin) o, hostConfig); - _accessManager = (AccessManager) o; - - _logger.info("Initialised access control for virtualhost '" + name + "' successfully"); + aclPlugin = getManager((ACLPlugin) o); + if (_logger.isInfoEnabled()) + { + _logger.info("Initialised ACL Plugin '" + aclPlugin.getPluginName() + + "' for virtualhost '" + name + "' successfully"); + } + return aclPlugin; } - private void initialiseAccessControl(AccessManager accessManager, Configuration config) + private static void initialiseAccessControl(ACLPlugin accessManager, Configuration config) throws ConfigurationException { + //First provide the ACLPlugin with the host configuration + + accessManager.setConfiguaration(config); + + //Provide additional attribute customisation. String baseName = "security.access.attributes.attribute."; List<String> argumentNames = config.getList(baseName + "name"); List<String> argumentValues = config.getList(baseName + "value"); @@ -123,33 +134,28 @@ public class AccessManagerImpl implements AccessManager } } - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); - } - public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) + private static ACLPlugin getManager(ACLPlugin manager) { - if (_accessManager == null) + if (manager == null) { - if (ApplicationRegistry.getInstance().getAccessManager() == this) + if (ApplicationRegistry.getInstance().getAccessManager() == null) { - _logger.warn("No Default access manager specified DENYING ALL ACCESS"); - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); + return new DenyAll(); } else { - return ApplicationRegistry.getInstance().getAccessManager().isAuthorized(accessObject, user, rights); + return ApplicationRegistry.getInstance().getAccessManager(); } } else { - return _accessManager.isAuthorized(accessObject, user, rights); + return manager; } } - public String getName() + public static Logger getLogger() { - return "AccessManagerImpl"; + return _logger; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java new file mode 100644 index 0000000000..7855f147b4 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.qpid.framing.AMQMethodBody; + +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.AMQConnectionException; +import org.apache.commons.configuration.Configuration; + + +public interface ACLPlugin +{ + /** + * Pseudo-Code: + * Identify requested RighConnectiont + * Lookup users ability for that right. + * if rightsExists + * Validate right on object + * Return result + * e.g + * User, CONSUME , Queue + * User, CONSUME , Exchange + RoutingKey + * User, PUBLISH , Exchange + RoutingKey + * User, CREATE , Exchange || Queue + * User, BIND , Exchange + RoutingKey + Queue + * + * @param session - The session requesting access + * @param permission - The permission requested + * @param parameters - The above objects that are used to authorise the request. + * @return The AccessResult decision + */ + //todo potential refactor this ConnectionException Out of here + AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException; + + String getPluginName(); + + void setConfiguaration(Configuration config); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java index b8d8fc605a..89cead69b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java @@ -30,15 +30,15 @@ public class AccessResult StringBuilder _authorizer; AccessStatus _status; - public AccessResult(AccessManager authorizer, AccessStatus status) + public AccessResult(ACLPlugin authorizer, AccessStatus status) { _status = status; - _authorizer = new StringBuilder(authorizer.getName()); + _authorizer = new StringBuilder(authorizer.getPluginName()); } - public void setAuthorizer(AccessManager authorizer) + public void setAuthorizer(ACLPlugin authorizer) { - _authorizer.append(authorizer.getName()); + _authorizer.append(authorizer.getPluginName()); } public String getAuthorizer() @@ -56,10 +56,10 @@ public class AccessResult return _status; } - public void addAuthorizer(AccessManager accessManager) + public void addAuthorizer(ACLPlugin accessManager) { _authorizer.insert(0, "->"); - _authorizer.insert(0, accessManager.getName()); + _authorizer.insert(0, accessManager.getPluginName()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java deleted file mode 100644 index 1ddca3a64e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import java.security.Principal; - -public class AllowAll implements AccessManager -{ - - public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) - { - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); - } - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); - } - - public String getName() - { - return "AllowAll"; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java deleted file mode 100644 index bf40eeba4e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import java.security.Principal; - -public class DenyAll implements AccessManager -{ - public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) - { - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - - public String getName() - { - return "DenyAll"; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java deleted file mode 100644 index 291bc714ed..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.FileNotFoundException; -import java.io.File; -import java.util.regex.Pattern; -import java.security.Principal; - -/** - * Represents a user database where the account information is stored in a simple flat file. - * - * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn - * - * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text. - */ -public class FileAccessManager implements AccessManager -{ - private static final Logger _logger = Logger.getLogger(FileAccessManager.class); - - protected File _accessFile; - - protected Pattern _regexp = Pattern.compile(":"); - - private static final short USER_INDEX = 0; - private static final short VIRTUALHOST_INDEX = 1; - - public void setAccessFile(String accessFile) throws FileNotFoundException - { - File f = new File(accessFile); - _logger.info("FileAccessManager using file " + f.getAbsolutePath()); - _accessFile = f; - if (!f.exists()) - { - throw new FileNotFoundException("Cannot find access file " + f); - } - if (!f.canRead()) - { - throw new FileNotFoundException("Cannot read access file " + f + - ". Check permissions."); - } - } - - /** - * Looks up the virtual hosts for a specified user in the access file. - * - * @param user The user to lookup - * - * @return a list of virtualhosts - */ - private VirtualHostAccess[] lookupVirtualHost(String user) - { - String[] results = lookup(user, VIRTUALHOST_INDEX); - VirtualHostAccess vhosts[] = new VirtualHostAccess[results.length]; - - for (int index = 0; index < results.length; index++) - { - vhosts[index] = new VirtualHostAccess(results[index]); - } - - return vhosts; - } - - - private String[] lookup(String user, int index) - { - try - { - BufferedReader reader = null; - try - { - reader = new BufferedReader(new FileReader(_accessFile)); - String line; - - while ((line = reader.readLine()) != null) - { - String[] result = _regexp.split(line); - if (result == null || result.length < (index + 1)) - { - continue; - } - - if (user.equals(result[USER_INDEX])) - { - return result[index].split(","); - } - } - return null; - } - finally - { - if (reader != null) - { - reader.close(); - } - } - } - catch (IOException ioe) - { - //ignore - } - return null; - } - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); - } - - public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) - { - if (accessObject instanceof VirtualHost) - { - VirtualHostAccess[] hosts = lookupVirtualHost(user.getName()); - - if (hosts != null) - { - for (VirtualHostAccess host : hosts) - { - if (accessObject.getAccessableName().equals(host.getVirtualHost())) - { - if (host.getAccessRights().allows(rights)) - { - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); - } - else - { - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - } - } - } - } -// else if (accessObject instanceof AMQQueue) -// { -// String[] queues = lookupQueue(username, ((AMQQueue) accessObject).getVirtualHost()); -// -// if (queues != null) -// { -// for (String queue : queues) -// { -// if (accessObject.getAccessableName().equals(queue)) -// { -// return new AccessResult(this, AccessResult.AccessStatus.GRANTED); -// } -// } -// } -// } - - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - - public String getName() - { - return "FileAccessManager"; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java index d70a6dc8f4..5d439a99eb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java @@ -1,34 +1,37 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import java.security.Principal; - -public interface AccessManager -{ - AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights); - - @Deprecated - AccessResult isAuthorized(Accessable accessObject, String username); - - String getName(); - -} +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public enum Permission
+{
+ CONSUME,
+ PUBLISH,
+ CREATE,
+ ACCESS,
+ BIND,
+ UNBIND,
+ DELETE,
+ PURGE
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java deleted file mode 100644 index 6ccadb2e7d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.log4j.Logger; - -import java.security.Principal; - -public class PrincipalDatabaseAccessManager implements AccessManager -{ - private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAccessManager.class); - - PrincipalDatabase _database; - AccessManager _default; - - public PrincipalDatabaseAccessManager() - { - _default = null; - } - - public void setDefaultAccessManager(String defaultAM) - { - if (defaultAM.equals("AllowAll")) - { - _default = new AllowAll(); - } - - if (defaultAM.equals("DenyAll")) - { - _default = new DenyAll(); - } - } - - public void setPrincipalDatabase(String database) - { - _database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(database); - if (!(_database instanceof AccessManager)) - { - _logger.warn("Database '" + database + "' cannot perform access management"); - } - } - - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); - } - - public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) - { - AccessResult result; - - if (_database == null) - { - if (_default != null) - { - result = _default.isAuthorized(accessObject, username, rights); - } - else - { - throw new RuntimeException("Principal Database and default Access Manager are both null unable to perform Access Control"); - } - } - else - { - if (!(_database instanceof AccessManager)) - { - _logger.warn("Specified PrincipalDatabase is not an AccessManager so using default AccessManager"); - result = _default.isAuthorized(accessObject, username, rights); - } - else - { - result = ((AccessManager) _database).isAuthorized(accessObject, username, rights); - } - } - - result.addAuthorizer(this); - - return result; - } - - public String getName() - { - return "PrincipalDatabaseFileAccessManager"; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java new file mode 100755 index 0000000000..23073e0613 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java @@ -0,0 +1,579 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.exchange.Exchange; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class PrincipalPermissions +{ + + private static final Object CONSUME_QUEUES_KEY = new Object(); + private static final Object CONSUME_TEMPORARY_KEY = new Object(); + private static final Object CONSUME_OWN_QUEUES_ONLY_KEY = new Object(); + + private static final Object CREATE_QUEUES_KEY = new Object(); + private static final Object CREATE_EXCHANGES_KEY = new Object(); + + private static final Object CREATE_QUEUE_TEMPORARY_KEY = new Object(); + private static final Object CREATE_QUEUE_QUEUES_KEY = new Object(); + private static final Object CREATE_QUEUE_EXCHANGES_KEY = new Object(); + + private static final Object CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY = new Object(); + private static final Object CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY = new Object(); + + private static final int PUBLISH_EXCHANGES_KEY = 0; + + private Map _permissions; + + private String _user; + + + public PrincipalPermissions(String user) + { + _user = user; + _permissions = new ConcurrentHashMap(); + } + + public void grant(Permission permission, Object... parameters) + { + switch (permission) + { + case ACCESS: + break; // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS + case BIND: + break; // All the details are currently included in the create setup. + case CONSUME: // Parameters : AMQShortString queueName, Boolean Temporary, Boolean ownQueueOnly + Map consumeRights = (Map) _permissions.get(permission); + + if (consumeRights == null) + { + consumeRights = new ConcurrentHashMap(); + _permissions.put(permission, consumeRights); + } + + //if we have parametsre + if (parameters.length > 0) + { + AMQShortString queueName = (AMQShortString) parameters[0]; + Boolean temporary = (Boolean) parameters[1]; + Boolean ownQueueOnly = (Boolean) parameters[2]; + + if (temporary) + { + consumeRights.put(CONSUME_TEMPORARY_KEY, true); + } + else + { + consumeRights.put(CONSUME_TEMPORARY_KEY, false); + } + + if (ownQueueOnly) + { + consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true); + } + else + { + consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false); + } + + + LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY); + if (queues == null) + { + queues = new LinkedList(); + consumeRights.put(CONSUME_QUEUES_KEY, queues); + } + + if (queueName != null) + { + queues.add(queueName); + } + } + + + break; + case CREATE: // Parameters : Boolean temporary, AMQShortString queueName + // , AMQShortString exchangeName , AMQShortString routingKey + // || AMQShortString exchangeName , AMQShortString Class + + Map createRights = (Map) _permissions.get(permission); + + if (createRights == null) + { + createRights = new ConcurrentHashMap(); + _permissions.put(permission, createRights); + + } + + //The existence of the empty map mean permission to all. + if (parameters.length == 0) + { + return; + } + + + if (parameters[0] instanceof Boolean) //Create Queue : + // Boolean temporary, [AMQShortString queueName, AMQShortString exchangeName , AMQShortString routingKey] + { + Boolean temporary = (Boolean) parameters[0]; + + AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null; + AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null; + //Set the routingkey to the specified value or the queueName if present + AMQShortString routingKey = parameters.length > 3 ? (AMQShortString) parameters[3] : queueName; + + // Get the queues map + Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY); + + if (create_queues == null) + { + create_queues = new ConcurrentHashMap(); + createRights.put(CREATE_QUEUES_KEY, create_queues); + } + + //Allow all temp queues to be created + create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary); + + //Create empty list of queues + Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY); + + if (create_queues_queues == null) + { + create_queues_queues = new ConcurrentHashMap(); + create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues); + } + + // We are granting CREATE rights to all temporary queues only + if (parameters.length == 1) + { + return; + } + + // if we have a queueName then we need to store any associated exchange / rk bindings + if (queueName != null) + { + Map queue = (Map) create_queues_queues.get(queueName); + if (queue == null) + { + queue = new ConcurrentHashMap(); + create_queues_queues.put(queueName, queue); + } + + if (exchangeName != null) + { + queue.put(exchangeName, routingKey); + } + + //If no exchange is specified then the presence of the queueName in the map says any exchange is ok + } + + // Store the exchange that we are being granted rights to. This will be used as part of binding + + //Lookup the list of exchanges + Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY); + + if (create_queues_exchanges == null) + { + create_queues_exchanges = new ConcurrentHashMap(); + create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges); + } + + //if we have an exchange + if (exchangeName != null) + { + //Retrieve the list of permitted exchanges. + Map exchanges = (Map) create_queues_exchanges.get(exchangeName); + + if (exchanges == null) + { + exchanges = new ConcurrentHashMap(); + create_queues_exchanges.put(exchangeName, exchanges); + } + + //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY + exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary); + + //Store the binding details of queue/rk for this exchange. + if (queueName != null) + { + //Retrieve the list of permitted routingKeys. + Map rKeys = (Map) exchanges.get(exchangeName); + + if (rKeys == null) + { + rKeys = new ConcurrentHashMap(); + exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys); + } + + rKeys.put(queueName, routingKey); + } + } + } + else // Create Exchange : AMQShortString exchangeName , AMQShortString Class + { + Map create_exchanges = (Map) createRights.get(CREATE_EXCHANGES_KEY); + + if (create_exchanges == null) + { + create_exchanges = new ConcurrentHashMap(); + createRights.put(CREATE_EXCHANGES_KEY, create_exchanges); + } + + //Should perhaps error if parameters[0] is null; + AMQShortString exchangeName = parameters.length > 0 ? (AMQShortString) parameters[0] : null; + AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : null; + + //Store the exchangeName / class mapping if the mapping is null + createRights.put(exchangeName, className); + } + break; + case DELETE: + break; + + case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey + Map publishRights = (Map) _permissions.get(permission); + + if (publishRights == null) + { + publishRights = new ConcurrentHashMap(); + _permissions.put(permission, publishRights); + } + + if (parameters == null || parameters.length == 0) + { + //If we have no parameters then allow publish to all destinations + // this is signified by having a null value for publish_exchanges + } + else + { + Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY); + + if (publish_exchanges == null) + { + publish_exchanges = new ConcurrentHashMap(); + publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges); + } + + + HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]); + + // Check to see if we have a routing key + if (parameters.length == 2) + { + if (routingKeys == null) + { + routingKeys = new HashSet<AMQShortString>(); + } + //Add routing key to permitted publish destinations + routingKeys.add(parameters[1]); + } + + // Add the updated routingkey list or null if all values allowed + publish_exchanges.put(parameters[0], routingKeys); + } + break; + case PURGE: + break; + case UNBIND: + break; + } + + } + + public boolean authorise(Permission permission, Object... parameters) + { + + switch (permission) + { + case ACCESS: + return true; // This is here for completeness but the SimpleXML ACLManager never calls it. + // The existence of this user specific PP can be validated in the map SimpleXML maintains. + case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey + + Exchange exchange = (Exchange) parameters[1]; + + AMQQueue bind_queueName = (AMQQueue) parameters[2]; + AMQShortString routingKey = (AMQShortString) parameters[3]; + + //Get all Create Rights for this user + Map bindCreateRights = (Map) _permissions.get(Permission.CREATE); + + //Look up the Queue Creation Rights + Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY); + + //Lookup the list of queues + Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY); + + // Check and see if we have a queue white list to check + if (bind_create_queues_queues != null) + { + //There a white list for queues + Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName); + + if (exchangeDetails == null) //Then all queue can be bound to all exchanges. + { + return true; + } + + // Check to see if we have a white list of routingkeys to check + Map rkeys = (Map) exchangeDetails.get(exchange.getName()); + + // if keys is null then any rkey is allowed on this exchange + if (rkeys == null) + { + // There is no routingkey white list + return true; + } + else + { + // We have routingKeys so a match must be found to allowed binding + Iterator keys = rkeys.keySet().iterator(); + + boolean matched = false; + while (keys.hasNext() && !matched) + { + AMQShortString rkey = (AMQShortString) keys.next(); + if (rkey.endsWith("*")) + { + matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString()); + } + else + { + matched = routingKey.equals(rkey); + } + } + + + return matched; + } + + + } + else + { + //There a is no white list for queues + + // So can allow all queues to be bound + // but we should first check and see if we have a temp queue and validate that we are allowed + // to bind temp queues. + + //Check to see if we have a temporary queue + if (bind_queueName.isAutoDelete()) + { + // Check and see if we have an exchange white list. + Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY); + + // If the exchange exists then we must check to see if temporary queues are allowed here + if (bind_exchanges != null) + { + // Check to see if the requested exchange is allowed. + Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName()); + + return (Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY); + } + + //no white list so all allowed, drop through to return true below. + } + + // not a temporary queue and no white list so all allowed. + return true; + } + + case CREATE:// Paramters : QueueDeclareBody || ExchangeDeclareBody + + Map createRights = (Map) _permissions.get(permission); + + // If there are no create rights then deny request + if (createRights == null) + { + return false; + } + + if (parameters.length == 1) + { + if (parameters[0] instanceof QueueDeclareBody) + { + QueueDeclareBody body = (QueueDeclareBody) parameters[0]; + + //Look up the Queue Creation Rights + Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY); + + //Lookup the list of queues allowed to be created + Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY); + + + AMQShortString queueName = body.getQueue(); + + + if (body.getAutoDelete())// we have a temporary queue + { + return (Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY); + } + else + { + // If there is a white list then check + return create_queues_queues == null || create_queues_queues.containsKey(queueName); + } + + } + else if (parameters[0] instanceof ExchangeDeclareBody) + { + ExchangeDeclareBody body = (ExchangeDeclareBody) parameters[0]; + + AMQShortString exchangeName = body.getExchange(); + + Map create_exchanges = (Map) createRights.get(CREATE_EXCHANGES_KEY); + + // If the exchange list is doesn't exist then all is allowed else check the valid exchanges + return create_exchanges == null || create_exchanges.containsKey(exchangeName); + } + } + break; + case CONSUME: // Parameters : AMQQueue + + if (parameters.length == 1 && parameters[0] instanceof AMQQueue) + { + AMQQueue queue = ((AMQQueue) parameters[0]); + Map queuePermissions = (Map) _permissions.get(permission); + + List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY); + + Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY); + Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY); + + // If user is allowed to publish to temporary queues and this is a temp queue then allow it. + if (temporayQueues) + { + if (queue.isAutoDelete()) + // This will allow consumption from any temporary queue including ones not owned by this user. + // Of course the exclusivity will not be broken. + { + // if not limited to ownQueuesOnly then ok else check queue Owner. + return !ownQueuesOnly || queue.getOwner().equals(_user); + } + else + { + return false; + } + } + + // if queues are white listed then ensure it is ok + if (queues != null) + { + // if no queues are listed then ALL are ok othereise it must be specified. + if (ownQueuesOnly) + { + if (queue.getOwner().equals(_user)) + { + return queues.size() == 0 || queues.contains(queue.getName()); + } + else + { + return false; + } + } + + // If we are + return queues.size() == 0 || queues.contains(queue.getName()); + } + } + + // Can't authenticate without the right parameters + return false; + case DELETE: + break; + + case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey + Map publishRights = (Map) _permissions.get(permission); + + if (publishRights == null) + { + return false; + } + + Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY); + + // Having no exchanges listed gives full publish rights to all exchanges + if (exchanges == null) + { + return true; + } + // Otherwise exchange must be listed in the white list + + // If the map doesn't have the exchange then it isn't allowed + if (!exchanges.containsKey(parameters[0])) + { + return false; + } + else + { + + // Get valid routing keys + HashSet routingKeys = (HashSet) exchanges.get(parameters[0]); + + // Having no routingKeys in the map then all are allowed. + if (routingKeys == null) + { + return true; + } + else + { + // We have routingKeys so a match must be found to allowed binding + Iterator keys = routingKeys.iterator(); + + + AMQShortString publishRKey = (AMQShortString)parameters[1]; + + boolean matched = false; + while (keys.hasNext() && !matched) + { + AMQShortString rkey = (AMQShortString) keys.next(); + + if (rkey.endsWith("*")) + { + matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1)); + } + else + { + matched = publishRKey.equals(rkey); + } + } + return matched; + } + } + case PURGE: + break; + case UNBIND: + break; + + } + + return false; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java index 2dc7fcbc1e..a8ae03cc5d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java @@ -18,7 +18,7 @@ * * */ -package org.apache.qpid.server.security.access; +package org.apache.qpid.server.security.access.management; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; @@ -26,6 +26,7 @@ import org.apache.qpid.server.management.MBeanOperation; import org.apache.qpid.server.management.MBeanInvocationHandlerImpl; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.security.access.management.UserManagement; import org.apache.log4j.Logger; import org.apache.commons.configuration.ConfigurationException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java index b8762aa43b..658d7ebbd3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java @@ -18,7 +18,7 @@ * * */ -package org.apache.qpid.server.security.access; +package org.apache.qpid.server.security.access.management; import org.apache.qpid.server.management.MBeanOperation; import org.apache.qpid.server.management.MBeanOperationParameter; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java new file mode 100644 index 0000000000..9b784069dd --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access.plugins; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.access.AccessResult; +import org.apache.qpid.server.security.access.Accessable; +import org.apache.qpid.server.security.access.Permission; +import org.apache.commons.configuration.Configuration; + +public class AllowAll implements ACLPlugin +{ + public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) + { + if (ACLManager.getLogger().isDebugEnabled()) + { + ACLManager.getLogger().debug("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString() + + " on " + body.getClass().getSimpleName() + + (parameters == null || parameters.length == 0 ? "" : "-" + accessablesToString(parameters))); + } + + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + } + + public static String accessablesToString(Object[] accessObject) + { + StringBuilder sb = new StringBuilder(); + + for (Object access : accessObject) + { + sb.append(access.getClass().getSimpleName() + ":" + access.toString() + ", "); + } + + return sb.delete(sb.length() - 2, sb.length()).toString(); + } + + public String getPluginName() + { + return "AllowAll"; + } + + public void setConfiguaration(Configuration config) + { + //no-op + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java new file mode 100644 index 0000000000..80c125e737 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access.plugins; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.AccessResult; +import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.AMQConnectionException; +import org.apache.commons.configuration.Configuration; + +public class DenyAll implements ACLPlugin +{ + public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException + { + + if (ACLManager.getLogger().isInfoEnabled()) + { + } + ACLManager.getLogger().info("Denying user:" + session.getAuthorizedID() + " for :" + permission.toString() + + " on " + body.getClass().getSimpleName() + + (parameters == null || parameters.length == 0 ? "" : "-" + AllowAll.accessablesToString(parameters))); + + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "DenyAll Plugin"); + } + + public String getPluginName() + { + return "DenyAll"; + } + + public void setConfiguaration(Configuration config) + { + //no-op + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java new file mode 100644 index 0000000000..251f4e6330 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.server.security.access.plugins; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicPublishBody; + +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.AccessResult; +import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.server.security.access.PrincipalPermissions; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This uses the default + */ +public class SimpleXML implements ACLPlugin +{ + private Map<String, PrincipalPermissions> _users; + private final AccessResult GRANTED = new AccessResult(this, AccessResult.AccessStatus.GRANTED); + + public SimpleXML() + { + _users = new ConcurrentHashMap<String, PrincipalPermissions>(); + } + + public void setConfiguaration(Configuration config) + { + processConfig(config); + } + + private void processConfig(Configuration config) + { + processPublish(config); + + processConsume(config); + + processCreate(config); + } + + /** + * Publish format takes + * Exchange + Routing Key Pairs + * + * @param config XML Configuration + */ + private void processPublish(Configuration config) + { + Configuration publishConfig = config.subset("security.access_control_list.publish"); + + //Process users that have full publish permission + String[] users = publishConfig.getStringArray("users.user"); + + for (String user : users) + { + grant(Permission.PUBLISH, user); + } + + // Process exchange limited users + int exchangeCount = 0; + Configuration exchangeConfig = publishConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + + while (!exchangeConfig.isEmpty()) + { + //Get Exchange Name + AMQShortString exchangeName = new AMQShortString(exchangeConfig.getString("name")); + + //Get Routing Keys + int keyCount = 0; + Configuration routingkeyConfig = exchangeConfig.subset("routing_keys.routing_key(" + keyCount + ")"); + + while (!routingkeyConfig.isEmpty()) + { + //Get RoutingKey Value + AMQShortString routingKeyValue = new AMQShortString(routingkeyConfig.getString("value")); + + //Apply Exchange + RoutingKey permissions to Users + users = routingkeyConfig.getStringArray("users.user"); + for (String user : users) + { + grant(Permission.PUBLISH, user, exchangeName, routingKeyValue); + } + + //Apply permissions to Groups + + // Check for more configs + keyCount++; + routingkeyConfig = exchangeConfig.subset("routing_keys.routing_key(" + keyCount + ")"); + } + + //Apply Exchange wide permissions to Users + users = exchangeConfig.getStringArray("exchange(" + exchangeCount + ").users.user"); + + for (String user : users) + { + grant(Permission.PUBLISH, user, exchangeName); + } + + //Apply permissions to Groups + exchangeCount++; + exchangeConfig = publishConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + } + } + + private void grant(Permission permission, String user, Object... parameters) + { + PrincipalPermissions permissions = _users.get(user); + + if (permissions == null) + { + permissions = new PrincipalPermissions(user); + } + + _users.put(user, permissions); + permissions.grant(permission, parameters); + } + + private void processConsume(Configuration config) + { + Configuration consumeConfig = config.subset("security.access_control_list.consume"); + + // Process queue limited users + int queueCount = 0; + Configuration queueConfig = consumeConfig.subset("queues.queue(" + queueCount + ")"); + + while (!queueConfig.isEmpty()) + { + //Get queue Name + AMQShortString queueName = new AMQShortString(queueConfig.getString("name")); + // if there is no name then there may be a temporary element + boolean temporary = queueConfig.containsKey("temporary"); + boolean ownQueues = queueConfig.containsKey("own_queues"); + + //Process permissions for this queue + String[] users = queueConfig.getStringArray("users.user"); + for (String user : users) + { + grant(Permission.CONSUME, user, queueName, temporary, ownQueues); + } + + //See if we have another config + queueCount++; + queueConfig = consumeConfig.subset("queues.queue(" + queueCount + ")"); + } + + // Process users that have full consume permission + String[] users = consumeConfig.getStringArray("users.user"); + + for (String user : users) + { + grant(Permission.CONSUME, user); + } + } + + private void processCreate(Configuration config) + { + Configuration createConfig = config.subset("security.access_control_list.create"); + + // Process create permissions for queue creation + int queueCount = 0; + Configuration queueConfig = createConfig.subset("queues.queue(" + queueCount + ")"); + + while (!queueConfig.isEmpty()) + { + //Get queue Name + AMQShortString queueName = new AMQShortString(queueConfig.getString("name")); + + // if there is no name then there may be a temporary element + boolean temporary = queueConfig.containsKey("temporary"); + + int exchangeCount = 0; + Configuration exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + + while (!exchangeConfig.isEmpty()) + { + + AMQShortString exchange = new AMQShortString(exchangeConfig.getString("name")); + AMQShortString routingKey = new AMQShortString(exchangeConfig.getString("routing_key")); + + //Process permissions for this queue + String[] users = exchangeConfig.getStringArray("users.user"); + for (String user : users) + { + grant(Permission.CREATE, user, temporary, + (queueName.equals("") ? null : queueName), + (exchange.equals("") ? null : exchange), + (routingKey.equals("") ? null : routingKey)); + } + + //See if we have another config + exchangeCount++; + exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + } + + // Process users that are not bound to an exchange + String[] users = queueConfig.getStringArray("users.user"); + + for (String user : users) + { + grant(Permission.CREATE, user, temporary, queueName); + } + + //See if we have another config + queueCount++; + queueConfig = createConfig.subset("queues.queue(" + queueCount + ")"); + } + + // Process create permissions for exchange creation + int exchangeCount = 0; + Configuration exchangeConfig = createConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + + while (!exchangeConfig.isEmpty()) + { + AMQShortString exchange = new AMQShortString(exchangeConfig.getString("name")); + AMQShortString clazz = new AMQShortString(exchangeConfig.getString("class")); + + //Process permissions for this queue + String[] users = exchangeConfig.getStringArray("users.user"); + for (String user : users) + { + grant(Permission.CREATE, user, exchange, clazz); + } + + //See if we have another config + exchangeCount++; + exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + } + + // Process users that have full create permission + String[] users = createConfig.getStringArray("users.user"); + + for (String user : users) + { + grant(Permission.CREATE, user); + } + + + } + + public String getPluginName() + { + return "Simple"; + } + + public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException + { + String error = ""; + + if (ACLManager.getLogger().isInfoEnabled()) + { + ACLManager.getLogger().info("Simple Authorisation processing user:" + session.getAuthorizedID() + " for :" + permission.toString() + + " on " + body.getClass().getSimpleName() + + (parameters == null || parameters.length == 0 ? "" : "-" + AllowAll.accessablesToString(parameters))); + } + + String username = session.getAuthorizedID().getName(); + + //Get the Users Permissions + PrincipalPermissions permissions = _users.get(username); + + if (permissions != null) + { + switch (permission) + { + case ACCESS: + return GRANTED; + case BIND: // Body QueueDeclareBody - Parameters : Exchange, Queue, QueueName + // Body QueueBindBody - Paramters : Exchange, Queue, QueueName + if (parameters.length == 3) + { + // Parameters : Exchange, Queue, RoutingKey + if (permissions.authorise(Permission.BIND, body, parameters[0], parameters[1], parameters[2])) + { + return GRANTED; + } + } + break; + case CONSUME: // Parameters : none + if (parameters.length == 1 && permissions.authorise(Permission.CONSUME, parameters[0])) + { + return GRANTED; + } + break; + case CREATE: // Body : QueueDeclareBody | ExchangeDeclareBody - Parameters : none + if (permissions.authorise(Permission.CREATE, body)) + { + return GRANTED; + } + break; + case PUBLISH: // Body : BasicPublishBody Parameters : exchange + if (parameters.length == 1 && parameters[0] instanceof Exchange) + { + if (permissions.authorise(Permission.PUBLISH, ((Exchange) parameters[0]).getName(), + ((BasicPublishBody) body).getRoutingKey())) + { + return GRANTED; + } + } + break; + case PURGE: + break; + case DELETE: + break; + case UNBIND: + break; + } + } + + //todo potential refactor this ConnectionException Out of here + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java index 10adfdd9fc..348bccb4e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser; -import org.apache.qpid.server.security.access.AMQUserManagementMBean; +import org.apache.qpid.server.security.access.management.AMQUserManagementMBean; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.EncoderException; @@ -45,7 +45,6 @@ import java.util.LinkedList; import java.util.concurrent.locks.ReentrantLock; import java.security.Principal; import java.security.NoSuchAlgorithmException; -import java.security.MessageDigest; /** * Represents a user database where the account information is stored in a simple flat file. diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java index 2d3f5e5131..15c62a62e4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java @@ -37,7 +37,7 @@ import org.apache.qpid.configuration.PropertyException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; -import org.apache.qpid.server.security.access.AMQUserManagementMBean; +import org.apache.qpid.server.security.access.management.AMQUserManagementMBean; import org.apache.qpid.AMQException; import javax.management.JMException; @@ -50,17 +50,16 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab Map<String, PrincipalDatabase> _databases; - public ConfigurationFilePrincipalDatabaseManager() throws Exception + public ConfigurationFilePrincipalDatabaseManager(Configuration configuration) throws Exception { _logger.info("Initialising PrincipleDatabase authentication manager"); - _databases = initialisePrincipalDatabases(); + _databases = initialisePrincipalDatabases(configuration); } - private Map<String, PrincipalDatabase> initialisePrincipalDatabases() throws Exception + private Map<String, PrincipalDatabase> initialisePrincipalDatabases(Configuration configuration) throws Exception { - Configuration config = ApplicationRegistry.getInstance().getConfiguration(); - List<String> databaseNames = config.getList(_base + ".name"); - List<String> databaseClasses = config.getList(_base + ".class"); + List<String> databaseNames = configuration.getList(_base + ".name"); + List<String> databaseClasses = configuration.getList(_base + ".class"); Map<String, PrincipalDatabase> databases = new HashMap<String, PrincipalDatabase>(); if (databaseNames.size() == 0) @@ -85,7 +84,7 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab throw new Exception("Principal databases must implement the PrincipalDatabase interface"); } - initialisePrincipalDatabase((PrincipalDatabase) o, config, i); + initialisePrincipalDatabase((PrincipalDatabase) o, configuration, i); String name = databaseNames.get(i); if ((name == null) || (name.length() == 0)) @@ -200,7 +199,7 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab } String jmxaccesssFile = null; - + try { jmxaccesssFile = PropertyUtils.replaceProperties(jmxaccesslist.get(0)); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java deleted file mode 100644 index 5c372f6c2c..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.auth.database; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.security.access.AccessManager; -import org.apache.qpid.server.security.access.AccessResult; -import org.apache.qpid.server.security.access.AccessRights; -import org.apache.qpid.server.security.access.Accessable; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.security.Principal; - -/** - * Represents a user database where the account information is stored in a simple flat file. - * - * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn - * - * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text. - */ -public class PlainPasswordVhostFilePrincipalDatabase extends PlainPasswordFilePrincipalDatabase implements AccessManager -{ - private static final Logger _logger = Logger.getLogger(PlainPasswordVhostFilePrincipalDatabase.class); - - /** - * Looks up the virtual hosts for a specified user in the password file. - * - * @param user The user to lookup - * - * @return a list of virtualhosts - */ - private String[] lookupVirtualHost(String user) - { - try - { - BufferedReader reader = null; - try - { - reader = new BufferedReader(new FileReader(_passwordFile)); - String line; - - while ((line = reader.readLine()) != null) - { - if (!line.startsWith("#")) - { - String[] result = _regexp.split(line); - if (result == null || result.length < 3) - { - continue; - } - - if (user.equals(result[0])) - { - return result[2].split(","); - } - } - } - return null; - } - finally - { - if (reader != null) - { - reader.close(); - } - } - } - catch (IOException ioe) - { - //ignore - } - return null; - } - - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); - } - - public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) - { - - if (accessObject instanceof VirtualHost) - { - String[] hosts = lookupVirtualHost(user.getName()); - - if (hosts != null) - { - for (String host : hosts) - { - if (accessObject.getAccessableName().equals(host)) - { - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); - } - } - } - } - - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - - public String getName() - { - return "PlainPasswordVhostFile"; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java index 73d58ca489..c8a4add0f1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java @@ -69,10 +69,14 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase { throw new IllegalArgumentException("principal must not be null"); } - char[] pwd = _users.getProperty(principal.getName()).toCharArray(); + + + + final String pwd = _users.getProperty(principal.getName()); + if (pwd != null) { - callback.setPassword(pwd); + callback.setPassword(pwd.toCharArray()); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index ce5e0cd748..f589140e8e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -82,14 +82,6 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan if (databaseName == null) { - if (hostConfig instanceof SubsetConfiguration) - { - _logger.warn("No authentication specified for '" + ((SubsetConfiguration) hostConfig).getPrefix() + "'. Using Default authentication manager"); - } - else - { - _logger.warn("No authentication specified. Using Default authentication manager"); - } _default = ApplicationRegistry.getInstance().getAuthenticationManager(); return; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java index 1dcbb02d5c..23aaf56876 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java @@ -94,7 +94,7 @@ public class ConnectorConfiguration public String certType; @Configured(path = "connector.qpidnio", - defaultValue = "true") + defaultValue = "false") public boolean _multiThreadNIO; diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 047cef9064..1e4b69c935 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -49,11 +49,11 @@ public class NonTransactionalContext implements TransactionalContext /** Where to put undeliverable messages */ private final List<RequiredDeliveryException> _returnMessages; - private Set<Long> _browsedAcks; + private final Set<Long> _browsedAcks; private final MessageStore _messageStore; - private StoreContext _storeContext; + private final StoreContext _storeContext; /** Whether we are in a transaction */ private boolean _inTran; diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java index b4c66aa24d..6016ecc1a5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java @@ -24,7 +24,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoreContext; diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index e9fa642175..0acfa84f31 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -35,8 +35,8 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; -import org.apache.qpid.server.security.access.AccessManager; -import org.apache.qpid.server.security.access.AllowAll; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -48,7 +48,7 @@ public class NullApplicationRegistry extends ApplicationRegistry private VirtualHostRegistry _virtualHostRegistry; - private AccessManager _accessManager; + private ACLPlugin _accessManager; private PrincipalDatabaseManager _databaseManager; @@ -116,7 +116,7 @@ public class NullApplicationRegistry extends ApplicationRegistry return _virtualHostRegistry; } - public AccessManager getAccessManager() + public ACLPlugin getAccessManager() { return _accessManager; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 8d6a26fdbc..3ff9b8c356 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -1,260 +1,308 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-
-public class VirtualHost implements Accessable
-{
- private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
- private final String _name;
-
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private MessageStore _messageStore;
-
- protected VirtualHostMBean _virtualHostMBean;
-
- private AMQBrokerManagerMBean _brokerMBean;
-
- private AuthenticationManager _authenticationManager;
-
- private AccessManager _accessManager;
-
-
- public void setAccessableName(String name)
- {
- _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
- + name + ") ignored remains :" + getAccessableName());
- }
-
- public String getAccessableName()
- {
- return _name;
- }
-
-
- /**
- * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, "VirtualHost");
- }
-
- public String getObjectInstanceName()
- {
- return _name.toString();
- }
-
- public String getName()
- {
- return _name.toString();
- }
-
- public VirtualHost getVirtualHost()
- {
- return VirtualHost.this;
- }
-
-
- } // End of MBean class
-
- /**
- * Used for testing only
- * @param name
- * @param store
- * @throws Exception
- */
- public VirtualHost(String name, MessageStore store) throws Exception
- {
- this(name, null, store);
- }
-
- /**
- * Normal Constructor
- * @param name
- * @param hostConfig
- * @throws Exception
- */
- public VirtualHost(String name, Configuration hostConfig) throws Exception
- {
- this(name, hostConfig, null);
- }
-
- private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
- {
- _name = name;
-
- _virtualHostMBean = new VirtualHostMBean();
- // This isn't needed to be registered
- //_virtualHostMBean.register();
-
- _queueRegistry = new DefaultQueueRegistry(this);
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(hostConfig);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
-
- if (store != null)
- {
- _messageStore = store;
- }
- else
- {
- if (hostConfig == null)
- {
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
- }
- initialiseMessageStore(hostConfig);
- }
-
- _exchangeRegistry.initialise();
-
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
- _accessManager = new AccessManagerImpl(name, hostConfig);
-
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _brokerMBean.register();
- }
-
- private void initialiseMessageStore(Configuration config) throws Exception
- {
- String messageStoreClass = config.getString("store.class");
-
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", config);
- }
-
-
- public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
- {
- T instance;
- try
- {
- instance = instanceType.newInstance();
- }
- catch (Exception e)
- {
- _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
- }
- Configurator.configure(instance);
-
- return instance;
- }
-
-
- public String getName()
- {
- return _name;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- throw new UnsupportedOperationException();
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public AccessManager getAccessManager()
- {
- return _accessManager;
- }
-
- public void close() throws Exception
- {
- if (_messageStore != null)
- {
- _messageStore.close();
- }
- }
-
- public ManagedObject getBrokerMBean()
- {
- return _brokerMBean;
- }
-
- public ManagedObject getManagedObject()
- {
- return _virtualHostMBean;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import javax.management.NotCompliantMBeanException; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.access.Accessable; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.AMQException; + +import java.util.Timer; +import java.util.TimerTask; + +public class VirtualHost implements Accessable +{ + private static final Logger _logger = Logger.getLogger(VirtualHost.class); + + + private final String _name; + + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private MessageStore _messageStore; + + protected VirtualHostMBean _virtualHostMBean; + + private AMQBrokerManagerMBean _brokerMBean; + + private AuthenticationManager _authenticationManager; + + private ACLPlugin _accessManager; + + private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true); + + private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; + + public void setAccessableName(String name) + { + _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" + + name + ") ignored remains :" + getAccessableName()); + } + + public String getAccessableName() + { + return _name; + } + + + /** + * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any + * implementaion of an Exchange MBean should extend this class. + */ + public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost + { + public VirtualHostMBean() throws NotCompliantMBeanException + { + super(ManagedVirtualHost.class, "VirtualHost"); + } + + public String getObjectInstanceName() + { + return _name.toString(); + } + + public String getName() + { + return _name.toString(); + } + + public VirtualHost getVirtualHost() + { + return VirtualHost.this; + } + + + } // End of MBean class + + /** + * Used for testing only + * @param name + * @param store + * @throws Exception + */ + public VirtualHost(String name, MessageStore store) throws Exception + { + this(name, new PropertiesConfiguration(), store); + } + + /** + * Normal Constructor + * @param name + * @param hostConfig + * @throws Exception + */ + public VirtualHost(String name, Configuration hostConfig) throws Exception + { + this(name, hostConfig, null); + } + + public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception + { + _name = name; + + _virtualHostMBean = new VirtualHostMBean(); + // This isn't needed to be registered + //_virtualHostMBean.register(); + + _queueRegistry = new DefaultQueueRegistry(this); + _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeFactory.initialise(hostConfig); + _exchangeRegistry = new DefaultExchangeRegistry(this); + + if (store != null) + { + _messageStore = store; + } + else + { + if (hostConfig == null) + { + throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); + } + initialiseMessageStore(hostConfig); + } + + _exchangeRegistry.initialise(); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); + + _accessManager = ACLManager.loadACLManager(name, hostConfig); + + _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); + _brokerMBean.register(); + initialiseHouseKeeping(hostConfig); + } + + private void initialiseHouseKeeping(final Configuration hostConfig) + { + + long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD); + + /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ + if(period != 0L) + { + class RemoveExpiredMessagesTask extends TimerTask + { + public void run() + { + for(AMQQueue q : _queueRegistry.getQueues()) + { + + try + { + q.removeExpiredIfNoSubscribers(); + } + catch (AMQException e) + { + _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e); + throw new RuntimeException(e); + } + } + } + } + + _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), + period/2, + period); + } + } + + private void initialiseMessageStore(Configuration config) throws Exception + { + String messageStoreClass = config.getString("store.class"); + + Class clazz = Class.forName(messageStoreClass); + Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + " does not."); + } + _messageStore = (MessageStore) o; + _messageStore.configure(this, "store", config); + } + + + public <T> T getConfiguredObject(Class<T> instanceType, Configuration config) + { + T instance; + try + { + instance = instanceType.newInstance(); + } + catch (Exception e) + { + _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); + throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); + } + Configurator.configure(instance); + + return instance; + } + + + public String getName() + { + return _name; + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public ApplicationRegistry getApplicationRegistry() + { + throw new UnsupportedOperationException(); + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public ACLPlugin getAccessManager() + { + return _accessManager; + } + + public void close() throws Exception + { + if (_houseKeepingTimer != null) + { + _houseKeepingTimer.cancel(); + } + if (_messageStore != null) + { + _messageStore.close(); + } + } + + public ManagedObject getBrokerMBean() + { + return _brokerMBean; + } + + public ManagedObject getManagedObject() + { + return _virtualHostMBean; + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java new file mode 100644 index 0000000000..a0304a7b01 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java @@ -0,0 +1,128 @@ +package org.apache.qpid.server; + +import junit.framework.TestCase; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.AMQException;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +public class SelectorParserTest extends TestCase +{ + public void testSelectorWithHyphen() + { + testPass("Cost = 2 AND \"property-with-hyphen\" = 'wibble'"); + } + + public void testLike() + { + testFail("Cost LIKE 2"); + testPass("Cost LIKE 'Hello'"); + } + + public void testStringQuoted() + { + testPass("string = 'Test'"); + } + + public void testProperty() + { + testPass("prop1 = prop2"); + } + + public void testPropertyNames() + { + testPass("$min= TRUE AND _max= FALSE AND Prop_2 = true AND prop$3 = false"); + } + + public void testProtected() + { + testFail("NULL = 0 "); + testFail("TRUE = 0 "); + testFail("FALSE = 0 "); + testFail("NOT = 0 "); + testFail("AND = 0 "); + testFail("OR = 0 "); + testFail("BETWEEN = 0 "); + testFail("LIKE = 0 "); + testFail("IN = 0 "); + testFail("IS = 0 "); + testFail("ESCAPE = 0 "); + } + + + public void testBoolean() + { + testPass("min= TRUE AND max= FALSE "); + testPass("min= true AND max= false"); + } + + public void testDouble() + { + testPass("positive=31E2 AND negative=-31.4E3"); + testPass("min=" + Double.MIN_VALUE + " AND max=" + Double.MAX_VALUE); + } + + public void testLong() + { + testPass("minLong=" + Long.MIN_VALUE + "L AND maxLong=" + Long.MAX_VALUE + "L"); + } + + public void testInt() + { + testPass("minInt=" + Integer.MIN_VALUE + " AND maxInt=" + Integer.MAX_VALUE); + } + + public void testSigned() + { + testPass("negative=-42 AND positive=+42"); + } + + public void testOctal() + { + testPass("octal=042"); + } + + + private void testPass(String selector) + { + try + { + new JMSSelectorFilter(selector); + } + catch (AMQException e) + { + fail("Selector '" + selector + "' was not parsed :" + e.getMessage()); + } + } + + private void testFail(String selector) + { + try + { + new JMSSelectorFilter(selector); + fail("Selector '" + selector + "' was parsed "); + } + catch (AMQException e) + { + //normal path + } + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index 8e5879a51e..7e2d56b460 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -589,6 +589,11 @@ public class DestWildExchangeTest extends TestCase return null; } + public void setExchange(AMQShortString exchange) + { + + } + public boolean isImmediate() { return false; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 81b0ae2213..ed79384d42 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -173,7 +173,7 @@ public class AMQQueueAlertTest extends TestCase public void testQueueDepthAlertWithSubscribers() throws Exception { protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore); protocolSession.addChannel(channel); // Create queue @@ -242,6 +242,11 @@ public class AMQQueueAlertTest extends TestCase return null; } + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isImmediate() { return immediate; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index d86c90bdae..c02b47e9fd 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -123,7 +123,7 @@ public class AMQQueueMBeanTest extends TestCase TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore); protocolSession.addChannel(channel); _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false); @@ -234,6 +234,11 @@ public class AMQQueueMBeanTest extends TestCase return null; } + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isImmediate() { return immediate; |