diff options
28 files changed, 822 insertions, 211 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java new file mode 100755 index 0000000000..07aa13b10d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import java.security.Principal; + +public interface PrincipalHolder +{ + /** @return a Principal that was used to authorized this session */ + Principal getPrincipal(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java index dec0bca576..b021fece57 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java @@ -66,7 +66,9 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { - + _bytesCredit = 0l; + _messageCredit = 0l; + setSuspended(true); } @@ -100,6 +102,10 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl } + public void clearCredit() + { + //To change body of implemented methods use File | Settings | File Templates. + } public synchronized boolean hasCredit() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index 7b91894526..a3509ee02d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -201,4 +201,11 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl _messageCreditLimit = -1; } } + + public void clearCredit() + { + _bytesCreditLimit = 0l; + _messageCreditLimit = 0l; + setSuspended(true); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 205ca73f13..2db16ef751 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -677,7 +677,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public String toString() { - return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); + return _minaProtocolSession.getRemoteAddress() + "(" + (getPrincipal() == null ? "?" : getPrincipal().getName() + ")"); } public String dump() @@ -822,7 +822,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _authorizedID = authorizedID; } - public Principal getAuthorizedID() + public Principal getPrincipal() { return _authorizedID; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 1bac601225..1e8dd9f77a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -28,13 +28,14 @@ import org.apache.qpid.framing.*; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.PrincipalHolder; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.Principal; -public interface AMQProtocolSession extends AMQVersionAwareProtocolSession +public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder { public static final class ProtocolSessionIdentifier @@ -195,9 +196,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession void setAuthorizedID(Principal authorizedID); - /** @return a Principal that was used to authorized this session */ - Principal getAuthorizedID(); - public MethodRegistry getMethodRegistry(); public MethodDispatcher getMethodDispatcher(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 65235ba9b9..3e1477fc1c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -129,7 +129,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getAuthorizedId() { - return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null; + return (_session.getPrincipal() != null ) ? _session.getPrincipal().getName() : null; } public String getVersion() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index bb6ce65d42..0339245b04 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -39,6 +39,15 @@ public class AMQPriorityQueue extends SimpleAMQQueue super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities)); } + public AMQPriorityQueue(String queueName, + boolean durable, + String owner, + boolean autoDelete, + VirtualHost virtualHost, int priorities) throws AMQException + { + this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities); + } + public int getPriorities() { return ((PriorityQueueList) _entries).getPriorities(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 7509350e65..f31fbd6ad0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -26,6 +26,8 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Map; + public class AMQQueueFactory { @@ -83,4 +85,39 @@ public class AMQQueueFactory q.configure(config); return q; } + + public static AMQQueue createAMQQueueImpl(String queueName, + boolean durable, + String owner, + boolean autoDelete, + VirtualHost virtualHost, Map<String, Object> arguments) + throws AMQException + { + int priorities = 1; + if(arguments.containsKey(X_QPID_PRIORITIES)) + { + Object prioritiesObj = arguments.get(X_QPID_PRIORITIES); + if(prioritiesObj instanceof Number) + { + priorities = ((Number)prioritiesObj).intValue(); + } + } + + + AMQQueue q = null; + if(priorities > 1) + { + q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities); + } + else + { + q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost); + } + + //Register the new queue + virtualHost.getQueueRegistry().registerQueue(q); + q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName)); + return q; + + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 5babc3e3d4..8553f8bf16 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; @@ -188,7 +187,7 @@ public class IncomingMessage implements Filterable, InboundMessage AMQShortString userID = getContentHeader().properties instanceof BasicContentHeaderProperties ? ((BasicContentHeaderProperties) getContentHeader().properties).getUserId() : null; - if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString())) + if (MSG_AUTH && !_publisher.getPrincipal().getName().equals(userID == null? "" : userID.toString())) { throw new UnauthorizedAccessException("Acccess Refused",message); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 3d694b7137..86d1411450 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -164,6 +164,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost) + throws AMQException + { + this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost); + } + public void resetNotifications() { // This ensure that the notification checks for the configured alerts are created. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java index 6f7f66fad2..c53403d0df 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java @@ -41,6 +41,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; import org.apache.qpid.server.security.access.plugins.SimpleXML; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.PrincipalHolder; public class ACLManager { @@ -152,7 +153,7 @@ public class ACLManager return true; } - public boolean authoriseBind(final AMQProtocolSession session, final Exchange exch, final AMQQueue queue, + public boolean authoriseBind(final PrincipalHolder session, final Exchange exch, final AMQQueue queue, final AMQShortString routingKey) { return checkAllPlugins(new AccessCheck() @@ -167,7 +168,7 @@ public class ACLManager }); } - public boolean authoriseConnect(final AMQProtocolSession session, final VirtualHost virtualHost) + public boolean authoriseConnect(final PrincipalHolder session, final VirtualHost virtualHost) { return checkAllPlugins(new AccessCheck() { @@ -181,7 +182,7 @@ public class ACLManager }); } - public boolean authoriseConsume(final AMQProtocolSession session, final boolean noAck, final AMQQueue queue) + public boolean authoriseConsume(final PrincipalHolder session, final boolean noAck, final AMQQueue queue) { return checkAllPlugins(new AccessCheck() { @@ -195,7 +196,7 @@ public class ACLManager }); } - public boolean authoriseConsume(final AMQProtocolSession session, final boolean exclusive, final boolean noAck, + public boolean authoriseConsume(final PrincipalHolder session, final boolean exclusive, final boolean noAck, final boolean noLocal, final boolean nowait, final AMQQueue queue) { return checkAllPlugins(new AccessCheck() @@ -210,7 +211,7 @@ public class ACLManager }); } - public boolean authoriseCreateExchange(final AMQProtocolSession session, final boolean autoDelete, + public boolean authoriseCreateExchange(final PrincipalHolder session, final boolean autoDelete, final boolean durable, final AMQShortString exchangeName, final boolean internal, final boolean nowait, final boolean passive, final AMQShortString exchangeType) { @@ -227,7 +228,7 @@ public class ACLManager }); } - public boolean authoriseCreateQueue(final AMQProtocolSession session, final boolean autoDelete, + public boolean authoriseCreateQueue(final PrincipalHolder session, final boolean autoDelete, final boolean durable, final boolean exclusive, final boolean nowait, final boolean passive, final AMQShortString queue) { @@ -243,7 +244,7 @@ public class ACLManager }); } - public boolean authoriseDelete(final AMQProtocolSession session, final AMQQueue queue) + public boolean authoriseDelete(final PrincipalHolder session, final AMQQueue queue) { return checkAllPlugins(new AccessCheck() { @@ -257,7 +258,7 @@ public class ACLManager }); } - public boolean authoriseDelete(final AMQProtocolSession session, final Exchange exchange) + public boolean authoriseDelete(final PrincipalHolder session, final Exchange exchange) { return checkAllPlugins(new AccessCheck() { @@ -271,7 +272,7 @@ public class ACLManager }); } - public boolean authorisePublish(final AMQProtocolSession session, final boolean immediate, final boolean mandatory, + public boolean authorisePublish(final PrincipalHolder session, final boolean immediate, final boolean mandatory, final AMQShortString routingKey, final Exchange e) { return checkAllPlugins(new AccessCheck() @@ -286,7 +287,7 @@ public class ACLManager }); } - public boolean authorisePurge(final AMQProtocolSession session, final AMQQueue queue) + public boolean authorisePurge(final PrincipalHolder session, final AMQQueue queue) { return checkAllPlugins(new AccessCheck() { @@ -300,7 +301,7 @@ public class ACLManager }); } - public boolean authoriseUnbind(final AMQProtocolSession session, final Exchange exch, + public boolean authoriseUnbind(final PrincipalHolder session, final Exchange exch, final AMQShortString routingKey, final AMQQueue queue) { return checkAllPlugins(new AccessCheck() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java index 032184ec39..372a9f37a1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.PrincipalHolder; public interface ACLPlugin { @@ -41,30 +42,30 @@ public interface ACLPlugin // These return true if the plugin thinks the action should be allowed, and false if not. - AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue, AMQShortString routingKey); + AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey); - AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable, + AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable, AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType); - AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable, boolean exclusive, + AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, AMQShortString queue); - AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost); + AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost); - AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue); + AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue); - AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal, + AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal, boolean nowait, AMQQueue queue); - AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue); + AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue); - AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange); + AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange); - AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory, + AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory, AMQShortString routingKey, Exchange e); - AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue); + AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue); - AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey, AMQQueue queue); + AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey, AMQQueue queue); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java index 682135bc25..a66e4f3684 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.PrincipalHolder; /** * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations. @@ -35,63 +36,63 @@ public abstract class AbstractACLPlugin implements ACLPlugin private static final AuthzResult DEFAULT_ANSWER = AuthzResult.ABSTAIN; - public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue, + public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey) { return DEFAULT_ANSWER; } - public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost) + public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost) { return DEFAULT_ANSWER; } - public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue) + public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue) { return DEFAULT_ANSWER; } - public AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal, + public AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal, boolean nowait, AMQQueue queue) { return DEFAULT_ANSWER; } - public AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable, + public AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable, AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType) { // TODO Auto-generated method stub return null; } - public AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable, + public AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, AMQShortString queue) { return DEFAULT_ANSWER; } - public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue) + public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue) { return DEFAULT_ANSWER; } - public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange) + public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange) { return DEFAULT_ANSWER; } - public AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory, + public AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory, AMQShortString routingKey, Exchange e) { return DEFAULT_ANSWER; } - public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue) + public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue) { return DEFAULT_ANSWER; } - public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey, + public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey, AMQQueue queue) { return DEFAULT_ANSWER; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java index f7e537b02b..5f52805414 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.PrincipalHolder; public abstract class BasicACLPlugin implements ACLPlugin { @@ -35,37 +36,32 @@ public abstract class BasicACLPlugin implements ACLPlugin // Returns true or false if the plugin should authorise or deny the request protected abstract AuthzResult getResult(); - @Override - public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, + public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey) { return getResult(); } - @Override - public AuthzResult authoriseConnect(AMQProtocolSession session, + public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost) { return getResult(); } - @Override - public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, + public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue) { return getResult(); } - @Override - public AuthzResult authoriseConsume(AMQProtocolSession session, + public AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal, boolean nowait, AMQQueue queue) { return getResult(); } - @Override - public AuthzResult authoriseCreateExchange(AMQProtocolSession session, + public AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable, AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType) @@ -73,48 +69,41 @@ public abstract class BasicACLPlugin implements ACLPlugin return getResult(); } - @Override - public AuthzResult authoriseCreateQueue(AMQProtocolSession session, + public AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, AMQShortString queue) { return getResult(); } - @Override - public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue) + public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue) { return getResult(); } - @Override - public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange) + public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange) { return getResult(); } - @Override - public AuthzResult authorisePublish(AMQProtocolSession session, + public AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory, AMQShortString routingKey, Exchange e) { return getResult(); } - @Override - public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue) + public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue) { return getResult(); } - @Override - public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, + public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey, AMQQueue queue) { return getResult(); } - @Override public void setConfiguration(Configuration config) { // no-op diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java index 26a76c9af1..77d3c4bcdf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java @@ -54,7 +54,7 @@ public class DenyAll extends BasicACLPlugin if (ACLManager.getLogger().isInfoEnabled()) { ACLManager.getLogger().info( - "Denying user:" + session.getAuthorizedID()); + "Denying user:" + session.getPrincipal()); } throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "DenyAll Plugin"); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java index 2cc0c530de..3f8ab8d9e6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java @@ -22,25 +22,18 @@ package org.apache.qpid.server.security.access.plugins; import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLPluginFactory; import org.apache.qpid.server.security.access.AccessResult; import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.security.access.PrincipalPermissions; -import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.PrincipalHolder; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -290,9 +283,9 @@ public class SimpleXML implements ACLPlugin return "Simple"; } - public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue, AMQShortString routingKey) + public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -303,9 +296,9 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost) + public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -316,9 +309,9 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue) + public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -329,16 +322,16 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal, + public AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal, boolean nowait, AMQQueue queue) { return authoriseConsume(session, noAck, queue); } - public AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable, + public AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable, AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -349,10 +342,10 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable, boolean exclusive, + public AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable, boolean exclusive, boolean nowait, boolean passive, AMQShortString queue) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -363,9 +356,9 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue) + public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -376,9 +369,9 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange) + public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -389,10 +382,10 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory, + public AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory, AMQShortString routingKey, Exchange e) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -403,9 +396,9 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue) + public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; @@ -416,9 +409,9 @@ public class SimpleXML implements ACLPlugin } } - public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey, AMQQueue queue) + public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey, AMQQueue queue) { - PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName()); + PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName()); if (principalPermissions == null) { return AuthzResult.DENIED; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java index 810be8ae22..438bd696c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLPluginFactory; import org.apache.qpid.server.security.access.plugins.AbstractACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.PrincipalHolder; import org.apache.qpid.util.NetMatcher; public class FirewallPlugin extends AbstractACLPlugin @@ -178,7 +179,7 @@ public class FirewallPlugin extends AbstractACLPlugin private FirewallRule[] _rules; @Override - public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost) + public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost) { if (!(session instanceof AMQMinaProtocolSession)) { @@ -226,7 +227,6 @@ public class FirewallPlugin extends AbstractACLPlugin } } - @Override public void setConfiguration(Configuration config) throws ConfigurationException { // Get default action diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 2004478ed4..5cecf0cd05 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -311,7 +311,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { public void onAccept() { - acknowledge(entry); + _session.acknowledge(Subscription_0_10.this,entry); } public void onRelease() @@ -432,6 +432,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); } _stopped.set(true); + FlowCreditManager_0_10 creditManager = getCreditManager(); + creditManager.clearCredit(); } public void addCredit(MessageCreditUnit unit, long value) @@ -507,4 +509,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } + + public void flush() throws AMQException + { + _queue.flushSubscription(this); + stop(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index bbc2396063..8fc426a6f6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -23,13 +23,24 @@ package org.apache.qpid.server.transport; import org.apache.qpid.transport.*; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.txn.Transaction; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.security.Principal; + import static org.apache.qpid.util.Serial.*; +import com.sun.security.auth.UserPrincipal; -public class ServerSession extends Session +public class ServerSession extends Session implements PrincipalHolder { @@ -42,36 +53,60 @@ public class ServerSession extends Session public void onReject(); } + public static interface Task + { + public void doTask(ServerSession session); + } + private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); + private Transaction _transaction; + + private Principal _principal; + + private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>(); + + private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + + ServerSession(Connection connection, Binary name, long expiry) { super(connection, name, expiry); + _transaction = new AutoCommitTransaction(); + _principal = new UserPrincipal(connection.getAuthorizationID()); } ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { super(connection, delegate, name, expiry); + _transaction = new AutoCommitTransaction(); } - public void enqueue(ServerMessage message, ArrayList<AMQQueue> queues) + public void enqueue(final ServerMessage message, ArrayList<AMQQueue> queues) { - // TODO Txn - try + for(final AMQQueue q : queues) { - for(AMQQueue q : queues) + _transaction.enqueue(q,message, new Transaction.Action() { - q.enqueue(message); - } - } - catch (AMQException e) - { - // TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + + public void postCommit() + { + try + { + q.enqueue(message); + } + catch (AMQException e) + { + // TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + }); } + } @@ -171,15 +206,103 @@ public class ServerSession extends Session _messageDispositionListenerMap.remove(method.getId()); } - public void releaseAll() + public void onClose() { for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { listener.onRelease(); } _messageDispositionListenerMap.clear(); + + for (Task task : _taskList) + { + task.doTask(this); + } + } + public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) + { + _transaction.dequeue(entry.getQueue(), entry.getMessage(), + new Transaction.Action() + { + + public void postCommit() + { + sub.acknowledge(entry); + } + }); + + } + + public Map<String, Subscription_0_10> getSubscriptions() + { + return _subscriptions; + } + + public void register(String destination, Subscription_0_10 sub) + { + _subscriptions.put(destination, sub); + } + public Subscription_0_10 getSubscription(String destination) + { + return _subscriptions.get(destination); + } + + public void unregister(Subscription_0_10 sub) + { + _subscriptions.remove(sub); + try + { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + + } + catch (AMQException e) + { + // TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + finally + { + sub.releaseSendLock(); + } + } + + public void selectTx() + { + _transaction = new LocalTransaction(); + } + + public void commit() + { + _transaction.commit(); + } + + public void rollback() + { + _transaction.rollback(); + } + + void setPrincipal(Principal principal) + { + _principal = principal; + } + + public Principal getPrincipal() + { + return _principal; + } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 2d224d721a..7ed013db48 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -22,24 +22,31 @@ package org.apache.qpid.server.transport; import org.apache.qpid.transport.*; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.flow.*; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.QueueDeleteOkBody; import java.util.ArrayList; import java.util.Map; -import java.util.HashMap; public class ServerSessionDelegate extends SessionDelegate { private final IApplicationRegistry _appRegistry; - private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>(); public ServerSessionDelegate(IApplicationRegistry appRegistry) { @@ -106,7 +113,7 @@ public class ServerSessionDelegate extends SessionDelegate Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null); - _subscriptions.put(destination, sub); + ((ServerSession)session).register(destination, sub); try { queue.registerSubscription(sub, method.getExclusive()); @@ -115,6 +122,7 @@ public class ServerSessionDelegate extends SessionDelegate { // TODO e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); } } @@ -145,7 +153,9 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { + // TODO e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); } @@ -155,120 +165,117 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageCancel(Session session, MessageCancel method) { - super.messageCancel(session, method); - } - - @Override - public void messageFlush(Session session, MessageFlush method) - { - super.messageFlush(session, method); - } - - @Override - public void txSelect(Session session, TxSelect method) - { - super.txSelect(session, method); - } - - @Override - public void txCommit(Session session, TxCommit method) - { - super.txCommit(session, method); - } - - @Override - public void txRollback(Session session, TxRollback method) - { - super.txRollback(session, method); - } + String destination = method.getDestination(); - @Override - public void dtxSelect(Session session, DtxSelect method) - { - super.dtxSelect(session, method); - } + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - @Override - public void dtxStart(Session session, DtxStart method) - { - super.dtxStart(session, method); + if(sub == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } + else + { + ((ServerSession)session).unregister(sub); + } } @Override - public void dtxEnd(Session session, DtxEnd method) + public void messageFlush(Session session, MessageFlush method) { - super.dtxEnd(session, method); - } + String destination = method.getDestination(); - @Override - public void dtxCommit(Session session, DtxCommit method) - { - super.dtxCommit(session, method); - } + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - @Override - public void dtxForget(Session session, DtxForget method) - { - super.dtxForget(session, method); - } + if(sub == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } + else + { - @Override - public void dtxGetTimeout(Session session, DtxGetTimeout method) - { - super.dtxGetTimeout(session, method); + try + { + sub.flush(); + } + catch (AMQException e) + { + //TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } + } } @Override - public void dtxPrepare(Session session, DtxPrepare method) + public void txSelect(Session session, TxSelect method) { - super.dtxPrepare(session, method); + // TODO - check current tx mode + ((ServerSession)session).selectTx(); } @Override - public void dtxRecover(Session session, DtxRecover method) + public void txCommit(Session session, TxCommit method) { - super.dtxRecover(session, method); + // TODO - check current tx mode + ((ServerSession)session).commit(); } @Override - public void dtxRollback(Session session, DtxRollback method) + public void txRollback(Session session, TxRollback method) { - super.dtxRollback(session, method); + // TODO - check current tx mode + ((ServerSession)session).rollback(); } - @Override - public void dtxSetTimeout(Session session, DtxSetTimeout method) - { - super.dtxSetTimeout(session, method); - } @Override public void exchangeDeclare(Session session, ExchangeDeclare method) { String exchangeName = method.getExchange(); - + VirtualHost virtualHost = getVirtualHost(session); Exchange exchange = getExchange(session, exchangeName); if(method.getPassive()) { if(exchange == null) { - ExecutionException ex = new ExecutionException(); - ex.setErrorCode(ExecutionErrorCode.NOT_FOUND); - ex.setCommandId(method.getId()); - - ex.setDescription("not-found: exchange-name '"+exchangeName+"'"); + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'"); - session.invoke(ex); - session.close(); + // TODO - control flow + return; } } else { + if (!virtualHost.getAccessManager().authoriseCreateExchange((ServerSession)session, method.getAutoDelete(), + method.getDurable(), new AMQShortString(method.getExchange()), false, false, method.getPassive(), + new AMQShortString(method.getType()))) + { + + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; + String description = "permission denied: exchange-name '" + exchangeName + "'"; + + exception(session, method, errorCode, description); + + // TODO - Control Flow + return; + } + + // TODO } - super.exchangeDeclare(session, method); + } + + private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description) + { + ExecutionException ex = new ExecutionException(); + ex.setErrorCode(errorCode); + ex.setCommandId(method.getId()); + ex.setDescription(description); + + session.invoke(ex); + session.close(); } private Exchange getExchange(Session session, String exchangeName) @@ -307,7 +314,6 @@ public class ServerSessionDelegate extends SessionDelegate public void exchangeQuery(Session session, ExchangeQuery method) { super.exchangeQuery(session, method); - } @Override @@ -326,7 +332,6 @@ public class ServerSessionDelegate extends SessionDelegate public void exchangeBound(Session session, ExchangeBound method) { - ExchangeBoundResult result = new ExchangeBoundResult(); if(method.hasExchange()) { @@ -416,19 +421,246 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void queueDeclare(Session session, QueueDeclare method) { - super.queueDeclare(session, method); + + VirtualHost virtualHost = getVirtualHost(session); + + String queueName = method.getQueue(); + + if (!method.getPassive()) + { + // Perform ACL if request is not passive + + if (!virtualHost.getAccessManager().authoriseCreateQueue(((ServerSession)session), method.getAutoDelete(), method.getDurable(), + method.getExclusive(), false, method.getPassive(), new AMQShortString(queueName))) + { + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; + String description = "permission denied: queue-name '" + queueName + "'"; + + exception(session, method, errorCode, description); + + // TODO control flow + return; + } + } + + + AMQQueue queue; + QueueRegistry queueRegistry = getQueueRegistry(session); + //TODO: do we need to check that the queue already exists with exactly the same "configuration"? + + synchronized (queueRegistry) + { + + if (((queue = queueRegistry.getQueue(queueName)) == null)) + { + + if (method.getPassive()) + { + String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; + + exception(session, method, errorCode, description); + + return; + } + else + { + try + { + queue = createQueue(queueName, method, virtualHost, (ServerSession)session); + + if (queue.isDurable() && !queue.isAutoDelete()) + { + //store.createQueue(queue, body.getArguments()); + } + queueRegistry.registerQueue(queue); + boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister(); + + if (autoRegister) + { + ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); + + Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); + + queue.bind(defaultExchange, new AMQShortString(queueName), null); + + } + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } + } + } + else if (queue.getOwner() != null && !((ServerSession)session).getPrincipal().getName().equals(queue.getOwner())) + { + + String description = "Cannot declare queue('" + queueName + "')," + + " as exclusive queue with same name " + + "declared on another client ID('" + + queue.getOwner() + "')"; + ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; + + exception(session, method, errorCode, description); + + return; + } + + } + } + + + protected AMQQueue createQueue(final String queueName, + QueueDeclare body, + VirtualHost virtualHost, + final ServerSession session) + throws AMQException + { + final QueueRegistry registry = virtualHost.getQueueRegistry(); + + String owner = body.getExclusive() ? session.getPrincipal().getName() : null; + + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost, + body.getArguments()); + + + if (body.getExclusive() && !body.getDurable()) + { + final ServerSession.Task deleteQueueTask = + new ServerSession.Task() + { + public void doTask(ServerSession session) + { + if (registry.getQueue(queueName) == queue) + { + try + { + queue.delete(); + } + catch (AMQException e) + { + //TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } + } + } + }; + + session.addSessionCloseTask(deleteQueueTask); + + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) + { + session.removeSessionCloseTask(deleteQueueTask); + } + }); + }// if exclusive and not durable + + return queue; } @Override public void queueDelete(Session session, QueueDelete method) { - super.queueDelete(session, method); + + String queueName = method.getQueue(); + if(queueName == null || queueName.length()==0) + { + exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied"); + + } + else + { + AMQQueue queue = getQueue(session, queueName); + + + if (queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); + } + else + { + if (method.getIfEmpty() && !queue.isEmpty()) + { + exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty"); + } + else if (method.getIfUnused() && !queue.isUnused()) + { + // TODO - Error code + exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " in use"); + + } + else + { + VirtualHost virtualHost = getVirtualHost(session); + + //Perform ACLs + if (!virtualHost.getAccessManager().authoriseDelete(((ServerSession)session), queue)) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot delete queue " + queueName); + } + else + { + try + { + int purged = queue.delete(); + } + catch (AMQException e) + { + //TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } + + + /* if (queue.isDurable()) + { + store.removeQueue(queue); + }*/ + } + + } + } + } + } @Override public void queuePurge(Session session, QueuePurge method) { - super.queuePurge(session, method); + String queueName = method.getQueue(); + if(queueName == null || queueName.length()==0) + { + exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied"); + + } + else + { + AMQQueue queue = getQueue(session, queueName); + + + if (queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found"); + } + else + { + //TODO + try + { + queue.clearQueue(new StoreContext()); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } + } + } + } @Override @@ -438,44 +670,50 @@ public class ServerSessionDelegate extends SessionDelegate } @Override - public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm) + public void messageSetFlowMode(Session session, MessageSetFlowMode sfm) { String destination = sfm.getDestination(); - Subscription_0_10 sub = _subscriptions.get(destination); + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - // TODO null check + if(sub == null) + { + exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } if(sub.isStopped()) { sub.setFlowMode(sfm.getFlowMode()); } - - - } @Override - public void messageStop(Session ssn, MessageStop stop) + public void messageStop(Session session, MessageStop stop) { String destination = stop.getDestination(); - Subscription_0_10 sub = _subscriptions.get(destination); + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - // TODO null check + if(sub == null) + { + exception(session, stop, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } sub.stop(); } @Override - public void messageFlow(Session ssn, MessageFlow flow) + public void messageFlow(Session session, MessageFlow flow) { String destination = flow.getDestination(); - Subscription_0_10 sub = _subscriptions.get(destination); + Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); - // TODO null check + if(sub == null) + { + exception(session, flow, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + } sub.addCredit(flow.getUnit(), flow.getValue()); @@ -485,10 +723,17 @@ public class ServerSessionDelegate extends SessionDelegate public void closed(Session session) { super.closed(session); - for(Subscription_0_10 sub : _subscriptions.values()) + for(Subscription_0_10 sub : getSubscriptions(session).values()) { sub.close(); - ((ServerSession)session).releaseAll(); } + ((ServerSession)session).onClose(); + ((ServerSession)session).onClose(); + } + + public Map<String, Subscription_0_10> getSubscriptions(Session session) + { + return ((ServerSession)session).getSubscriptions(); } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java new file mode 100755 index 0000000000..78fad3f629 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.message.ServerMessage; + +public class AutoCommitTransaction implements Transaction +{ + + + public void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction) + { + // store.remove enqueue + // store.commit + postCommitAction.postCommit(); + } + + public void enqueue(AMQQueue queue, ServerMessage message, Action postCommitAction) + { + // store.add enqueue + // store.commit + postCommitAction.postCommit(); + } + + public void commit() + { + + } + + public void rollback() + { + + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java new file mode 100755 index 0000000000..92347b9927 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -0,0 +1,50 @@ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.message.ServerMessage; + +import java.util.List; +import java.util.ArrayList; + +public class LocalTransaction implements Transaction +{ + private final List<Action> _postCommitActions = new ArrayList<Action>(); + + public void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction) + { + _postCommitActions.add(postCommitAction); + } + + public void enqueue(AMQQueue queue, ServerMessage message, Action postCommitAction) + { + _postCommitActions.add(postCommitAction); + } + + public void commit() + { + try + { + for(Action action : _postCommitActions) + { + action.postCommit(); + } + } + finally + { + _postCommitActions.clear(); + } + } + + public void rollback() + { + + try + { + + } + finally + { + _postCommitActions.clear(); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java new file mode 100755 index 0000000000..91f429f48e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.message.ServerMessage; + +public interface Transaction +{ + + + public static interface Action + { + public void postCommit(); + } + + void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction); + + void enqueue(AMQQueue queue, ServerMessage message, Action postCommitAction); + + void commit(); + + void rollback(); +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index 99c88fac3e..ad6c95db8e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -186,7 +186,7 @@ public class MockProtocolSession implements AMQProtocolSession //To change body of implemented methods use File | Settings | File Templates. } - public Principal getAuthorizedID() + public Principal getPrincipal() { return null; //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java index f62b0c6241..a964fa931c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java @@ -24,6 +24,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.security.access.plugins.AllowAll; +import org.apache.qpid.server.PrincipalHolder; public class ExchangeDenier extends AllowAll { @@ -42,7 +43,7 @@ public class ExchangeDenier extends AllowAll }; @Override - public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange) + public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange) { return AuthzResult.DENIED; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java index 5497f0ae44..a415d56601 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; import org.apache.qpid.server.security.access.plugins.AllowAll; +import org.apache.qpid.server.PrincipalHolder; public class QueueDenier extends AllowAll { @@ -48,7 +49,7 @@ public class QueueDenier extends AllowAll @Override - public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue) + public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue) { if (!(queue.getName().toString().equals(_queueName))) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 43445232d6..08842c94c0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -84,7 +84,8 @@ public class Connection extends ConnectionInvoker private SaslServer saslServer; private SaslClient saslClient; private long idleTimeout = 0; - + private String _authorizationID; + // want to make this final private int _connectionId; @@ -525,7 +526,17 @@ public class Connection extends ConnectionInvoker { return idleTimeout; } - + + public void setAuthorizationID(String authorizationID) + { + _authorizationID = authorizationID; + } + + public String getAuthorizationID() + { + return _authorizationID; + } + public String toString() { return String.format("conn:%x", System.identityHashCode(this)); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 8caf29ecb5..453921ea2b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -131,6 +131,7 @@ public class ServerDelegate extends ConnectionDelegate (Integer.MAX_VALUE, org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, 0, Integer.MAX_VALUE); + conn.setAuthorizationID(ss.getAuthorizationID()); } else { |