summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-08-26 22:28:43 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-08-26 22:28:43 +0000
commit1ff7ce2b1476bed35829105456065fda0b7bf4ff (patch)
treebd8c484e84150f3971083705f9c3c421981d0c38
parente2fc7fa318d76d1fbcf14c60e43e2ba7cb431bb3 (diff)
downloadqpid-python-1ff7ce2b1476bed35829105456065fda0b7bf4ff.tar.gz
Implement transient transactions, some queue methods, etc
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@808207 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java51
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java149
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java433
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java53
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java50
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java42
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java1
28 files changed, 822 insertions, 211 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java
new file mode 100755
index 0000000000..07aa13b10d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import java.security.Principal;
+
+public interface PrincipalHolder
+{
+ /** @return a Principal that was used to authorized this session */
+ Principal getPrincipal();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
index dec0bca576..b021fece57 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
@@ -66,7 +66,9 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
-
+ _bytesCredit = 0l;
+ _messageCredit = 0l;
+ setSuspended(true);
}
@@ -100,6 +102,10 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
}
+ public void clearCredit()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
public synchronized boolean hasCredit()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
index 7b91894526..a3509ee02d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -201,4 +201,11 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
_messageCreditLimit = -1;
}
}
+
+ public void clearCredit()
+ {
+ _bytesCreditLimit = 0l;
+ _messageCreditLimit = 0l;
+ setSuspended(true);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 205ca73f13..2db16ef751 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -677,7 +677,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public String toString()
{
- return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+ return _minaProtocolSession.getRemoteAddress() + "(" + (getPrincipal() == null ? "?" : getPrincipal().getName() + ")");
}
public String dump()
@@ -822,7 +822,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_authorizedID = authorizedID;
}
- public Principal getAuthorizedID()
+ public Principal getPrincipal()
{
return _authorizedID;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 1bac601225..1e8dd9f77a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -28,13 +28,14 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.PrincipalHolder;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.Principal;
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder
{
public static final class ProtocolSessionIdentifier
@@ -195,9 +196,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
void setAuthorizedID(Principal authorizedID);
- /** @return a Principal that was used to authorized this session */
- Principal getAuthorizedID();
-
public MethodRegistry getMethodRegistry();
public MethodDispatcher getMethodDispatcher();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 65235ba9b9..3e1477fc1c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -129,7 +129,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public String getAuthorizedId()
{
- return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
+ return (_session.getPrincipal() != null ) ? _session.getPrincipal().getName() : null;
}
public String getVersion()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index bb6ce65d42..0339245b04 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -39,6 +39,15 @@ public class AMQPriorityQueue extends SimpleAMQQueue
super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
}
+ public AMQPriorityQueue(String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, int priorities) throws AMQException
+ {
+ this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities);
+ }
+
public int getPriorities()
{
return ((PriorityQueueList) _entries).getPriorities();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 7509350e65..f31fbd6ad0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -26,6 +26,8 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+
public class AMQQueueFactory
{
@@ -83,4 +85,39 @@ public class AMQQueueFactory
q.configure(config);
return q;
}
+
+ public static AMQQueue createAMQQueueImpl(String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, Map<String, Object> arguments)
+ throws AMQException
+ {
+ int priorities = 1;
+ if(arguments.containsKey(X_QPID_PRIORITIES))
+ {
+ Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
+ if(prioritiesObj instanceof Number)
+ {
+ priorities = ((Number)prioritiesObj).intValue();
+ }
+ }
+
+
+ AMQQueue q = null;
+ if(priorities > 1)
+ {
+ q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities);
+ }
+ else
+ {
+ q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost);
+ }
+
+ //Register the new queue
+ virtualHost.getQueueRegistry().registerQueue(q);
+ q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
+ return q;
+
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 5babc3e3d4..8553f8bf16 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
@@ -188,7 +187,7 @@ public class IncomingMessage implements Filterable, InboundMessage
AMQShortString userID = getContentHeader().properties instanceof BasicContentHeaderProperties ?
((BasicContentHeaderProperties) getContentHeader().properties).getUserId() : null;
- if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
+ if (MSG_AUTH && !_publisher.getPrincipal().getName().equals(userID == null? "" : userID.toString()))
{
throw new UnauthorizedAccessException("Acccess Refused",message);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 3d694b7137..86d1411450 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -164,6 +164,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost)
+ throws AMQException
+ {
+ this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost);
+ }
+
public void resetNotifications()
{
// This ensure that the notification checks for the configured alerts are created.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
index 6f7f66fad2..c53403d0df 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.security.access.plugins.SimpleXML;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.PrincipalHolder;
public class ACLManager
{
@@ -152,7 +153,7 @@ public class ACLManager
return true;
}
- public boolean authoriseBind(final AMQProtocolSession session, final Exchange exch, final AMQQueue queue,
+ public boolean authoriseBind(final PrincipalHolder session, final Exchange exch, final AMQQueue queue,
final AMQShortString routingKey)
{
return checkAllPlugins(new AccessCheck()
@@ -167,7 +168,7 @@ public class ACLManager
});
}
- public boolean authoriseConnect(final AMQProtocolSession session, final VirtualHost virtualHost)
+ public boolean authoriseConnect(final PrincipalHolder session, final VirtualHost virtualHost)
{
return checkAllPlugins(new AccessCheck()
{
@@ -181,7 +182,7 @@ public class ACLManager
});
}
- public boolean authoriseConsume(final AMQProtocolSession session, final boolean noAck, final AMQQueue queue)
+ public boolean authoriseConsume(final PrincipalHolder session, final boolean noAck, final AMQQueue queue)
{
return checkAllPlugins(new AccessCheck()
{
@@ -195,7 +196,7 @@ public class ACLManager
});
}
- public boolean authoriseConsume(final AMQProtocolSession session, final boolean exclusive, final boolean noAck,
+ public boolean authoriseConsume(final PrincipalHolder session, final boolean exclusive, final boolean noAck,
final boolean noLocal, final boolean nowait, final AMQQueue queue)
{
return checkAllPlugins(new AccessCheck()
@@ -210,7 +211,7 @@ public class ACLManager
});
}
- public boolean authoriseCreateExchange(final AMQProtocolSession session, final boolean autoDelete,
+ public boolean authoriseCreateExchange(final PrincipalHolder session, final boolean autoDelete,
final boolean durable, final AMQShortString exchangeName, final boolean internal, final boolean nowait,
final boolean passive, final AMQShortString exchangeType)
{
@@ -227,7 +228,7 @@ public class ACLManager
});
}
- public boolean authoriseCreateQueue(final AMQProtocolSession session, final boolean autoDelete,
+ public boolean authoriseCreateQueue(final PrincipalHolder session, final boolean autoDelete,
final boolean durable, final boolean exclusive, final boolean nowait, final boolean passive,
final AMQShortString queue)
{
@@ -243,7 +244,7 @@ public class ACLManager
});
}
- public boolean authoriseDelete(final AMQProtocolSession session, final AMQQueue queue)
+ public boolean authoriseDelete(final PrincipalHolder session, final AMQQueue queue)
{
return checkAllPlugins(new AccessCheck()
{
@@ -257,7 +258,7 @@ public class ACLManager
});
}
- public boolean authoriseDelete(final AMQProtocolSession session, final Exchange exchange)
+ public boolean authoriseDelete(final PrincipalHolder session, final Exchange exchange)
{
return checkAllPlugins(new AccessCheck()
{
@@ -271,7 +272,7 @@ public class ACLManager
});
}
- public boolean authorisePublish(final AMQProtocolSession session, final boolean immediate, final boolean mandatory,
+ public boolean authorisePublish(final PrincipalHolder session, final boolean immediate, final boolean mandatory,
final AMQShortString routingKey, final Exchange e)
{
return checkAllPlugins(new AccessCheck()
@@ -286,7 +287,7 @@ public class ACLManager
});
}
- public boolean authorisePurge(final AMQProtocolSession session, final AMQQueue queue)
+ public boolean authorisePurge(final PrincipalHolder session, final AMQQueue queue)
{
return checkAllPlugins(new AccessCheck()
{
@@ -300,7 +301,7 @@ public class ACLManager
});
}
- public boolean authoriseUnbind(final AMQProtocolSession session, final Exchange exch,
+ public boolean authoriseUnbind(final PrincipalHolder session, final Exchange exch,
final AMQShortString routingKey, final AMQQueue queue)
{
return checkAllPlugins(new AccessCheck()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
index 032184ec39..372a9f37a1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.PrincipalHolder;
public interface ACLPlugin
{
@@ -41,30 +42,30 @@ public interface ACLPlugin
// These return true if the plugin thinks the action should be allowed, and false if not.
- AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue, AMQShortString routingKey);
+ AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey);
- AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable,
+ AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable,
AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType);
- AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable, boolean exclusive,
+ AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable, boolean exclusive,
boolean nowait, boolean passive, AMQShortString queue);
- AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost);
+ AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost);
- AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue);
+ AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue);
- AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal,
+ AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal,
boolean nowait, AMQQueue queue);
- AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue);
+ AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue);
- AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange);
+ AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange);
- AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory,
+ AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory,
AMQShortString routingKey, Exchange e);
- AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue);
+ AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue);
- AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey, AMQQueue queue);
+ AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey, AMQQueue queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java
index 682135bc25..a66e4f3684 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.PrincipalHolder;
/**
* This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations.
@@ -35,63 +36,63 @@ public abstract class AbstractACLPlugin implements ACLPlugin
private static final AuthzResult DEFAULT_ANSWER = AuthzResult.ABSTAIN;
- public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue,
+ public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue,
AMQShortString routingKey)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
+ public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue)
+ public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal,
+ public AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal,
boolean nowait, AMQQueue queue)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable,
+ public AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable,
AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType)
{
// TODO Auto-generated method stub
return null;
}
- public AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable,
+ public AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable,
boolean exclusive, boolean nowait, boolean passive, AMQShortString queue)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+ public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory,
+ public AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory,
AMQShortString routingKey, Exchange e)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue)
{
return DEFAULT_ANSWER;
}
- public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey,
+ public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey,
AMQQueue queue)
{
return DEFAULT_ANSWER;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
index f7e537b02b..5f52805414 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.PrincipalHolder;
public abstract class BasicACLPlugin implements ACLPlugin
{
@@ -35,37 +36,32 @@ public abstract class BasicACLPlugin implements ACLPlugin
// Returns true or false if the plugin should authorise or deny the request
protected abstract AuthzResult getResult();
- @Override
- public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch,
+ public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch,
AMQQueue queue, AMQShortString routingKey)
{
return getResult();
}
- @Override
- public AuthzResult authoriseConnect(AMQProtocolSession session,
+ public AuthzResult authoriseConnect(PrincipalHolder session,
VirtualHost virtualHost)
{
return getResult();
}
- @Override
- public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck,
+ public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck,
AMQQueue queue)
{
return getResult();
}
- @Override
- public AuthzResult authoriseConsume(AMQProtocolSession session,
+ public AuthzResult authoriseConsume(PrincipalHolder session,
boolean exclusive, boolean noAck, boolean noLocal, boolean nowait,
AMQQueue queue)
{
return getResult();
}
- @Override
- public AuthzResult authoriseCreateExchange(AMQProtocolSession session,
+ public AuthzResult authoriseCreateExchange(PrincipalHolder session,
boolean autoDelete, boolean durable, AMQShortString exchangeName,
boolean internal, boolean nowait, boolean passive,
AMQShortString exchangeType)
@@ -73,48 +69,41 @@ public abstract class BasicACLPlugin implements ACLPlugin
return getResult();
}
- @Override
- public AuthzResult authoriseCreateQueue(AMQProtocolSession session,
+ public AuthzResult authoriseCreateQueue(PrincipalHolder session,
boolean autoDelete, boolean durable, boolean exclusive,
boolean nowait, boolean passive, AMQShortString queue)
{
return getResult();
}
- @Override
- public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
{
return getResult();
}
- @Override
- public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+ public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
{
return getResult();
}
- @Override
- public AuthzResult authorisePublish(AMQProtocolSession session,
+ public AuthzResult authorisePublish(PrincipalHolder session,
boolean immediate, boolean mandatory, AMQShortString routingKey,
Exchange e)
{
return getResult();
}
- @Override
- public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue)
{
return getResult();
}
- @Override
- public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch,
+ public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch,
AMQShortString routingKey, AMQQueue queue)
{
return getResult();
}
- @Override
public void setConfiguration(Configuration config)
{
// no-op
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
index 26a76c9af1..77d3c4bcdf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
@@ -54,7 +54,7 @@ public class DenyAll extends BasicACLPlugin
if (ACLManager.getLogger().isInfoEnabled())
{
ACLManager.getLogger().info(
- "Denying user:" + session.getAuthorizedID());
+ "Denying user:" + session.getPrincipal());
}
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"DenyAll Plugin");
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
index 2cc0c530de..3f8ab8d9e6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
@@ -22,25 +22,18 @@
package org.apache.qpid.server.security.access.plugins;
import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
import org.apache.qpid.server.security.access.AccessResult;
import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.security.access.PrincipalPermissions;
-import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.PrincipalHolder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -290,9 +283,9 @@ public class SimpleXML implements ACLPlugin
return "Simple";
}
- public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue, AMQShortString routingKey)
+ public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -303,9 +296,9 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
+ public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -316,9 +309,9 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue)
+ public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -329,16 +322,16 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal,
+ public AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal,
boolean nowait, AMQQueue queue)
{
return authoriseConsume(session, noAck, queue);
}
- public AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable,
+ public AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable,
AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -349,10 +342,10 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable, boolean exclusive,
+ public AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable, boolean exclusive,
boolean nowait, boolean passive, AMQShortString queue)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -363,9 +356,9 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -376,9 +369,9 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+ public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -389,10 +382,10 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory,
+ public AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory,
AMQShortString routingKey, Exchange e)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -403,9 +396,9 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
@@ -416,9 +409,9 @@ public class SimpleXML implements ACLPlugin
}
}
- public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey, AMQQueue queue)
+ public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey, AMQQueue queue)
{
- PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+ PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
if (principalPermissions == null)
{
return AuthzResult.DENIED;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
index 810be8ae22..438bd696c2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
import org.apache.qpid.server.security.access.plugins.AbstractACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.PrincipalHolder;
import org.apache.qpid.util.NetMatcher;
public class FirewallPlugin extends AbstractACLPlugin
@@ -178,7 +179,7 @@ public class FirewallPlugin extends AbstractACLPlugin
private FirewallRule[] _rules;
@Override
- public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
+ public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost)
{
if (!(session instanceof AMQMinaProtocolSession))
{
@@ -226,7 +227,6 @@ public class FirewallPlugin extends AbstractACLPlugin
}
}
- @Override
public void setConfiguration(Configuration config) throws ConfigurationException
{
// Get default action
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 2004478ed4..5cecf0cd05 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -311,7 +311,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
public void onAccept()
{
- acknowledge(entry);
+ _session.acknowledge(Subscription_0_10.this,entry);
}
public void onRelease()
@@ -432,6 +432,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
}
_stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
}
public void addCredit(MessageCreditUnit unit, long value)
@@ -507,4 +509,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
+
+ public void flush() throws AMQException
+ {
+ _queue.flushSubscription(this);
+ stop();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index bbc2396063..8fc426a6f6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -23,13 +23,24 @@ package org.apache.qpid.server.transport;
import org.apache.qpid.transport.*;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.security.Principal;
+
import static org.apache.qpid.util.Serial.*;
+import com.sun.security.auth.UserPrincipal;
-public class ServerSession extends Session
+public class ServerSession extends Session implements PrincipalHolder
{
@@ -42,36 +53,60 @@ public class ServerSession extends Session
public void onReject();
}
+ public static interface Task
+ {
+ public void doTask(ServerSession session);
+ }
+
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
+ private Transaction _transaction;
+
+ private Principal _principal;
+
+ private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>();
+
+ private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
+
ServerSession(Connection connection, Binary name, long expiry)
{
super(connection, name, expiry);
+ _transaction = new AutoCommitTransaction();
+ _principal = new UserPrincipal(connection.getAuthorizationID());
}
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
super(connection, delegate, name, expiry);
+ _transaction = new AutoCommitTransaction();
}
- public void enqueue(ServerMessage message, ArrayList<AMQQueue> queues)
+ public void enqueue(final ServerMessage message, ArrayList<AMQQueue> queues)
{
- // TODO Txn
- try
+ for(final AMQQueue q : queues)
{
- for(AMQQueue q : queues)
+ _transaction.enqueue(q,message, new Transaction.Action()
{
- q.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- // TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+
+ public void postCommit()
+ {
+ try
+ {
+ q.enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ });
}
+
}
@@ -171,15 +206,103 @@ public class ServerSession extends Session
_messageDispositionListenerMap.remove(method.getId());
}
- public void releaseAll()
+ public void onClose()
{
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
listener.onRelease();
}
_messageDispositionListenerMap.clear();
+
+ for (Task task : _taskList)
+ {
+ task.doTask(this);
+ }
+
}
+ public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
+ {
+ _transaction.dequeue(entry.getQueue(), entry.getMessage(),
+ new Transaction.Action()
+ {
+
+ public void postCommit()
+ {
+ sub.acknowledge(entry);
+ }
+ });
+
+ }
+
+ public Map<String, Subscription_0_10> getSubscriptions()
+ {
+ return _subscriptions;
+ }
+
+ public void register(String destination, Subscription_0_10 sub)
+ {
+ _subscriptions.put(destination, sub);
+ }
+ public Subscription_0_10 getSubscription(String destination)
+ {
+ return _subscriptions.get(destination);
+ }
+
+ public void unregister(Subscription_0_10 sub)
+ {
+ _subscriptions.remove(sub);
+ try
+ {
+ sub.getSendLock();
+ sub.getQueue().unregisterSubscription(sub);
+
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+ }
+
+ public void selectTx()
+ {
+ _transaction = new LocalTransaction();
+ }
+
+ public void commit()
+ {
+ _transaction.commit();
+ }
+
+ public void rollback()
+ {
+ _transaction.rollback();
+ }
+
+ void setPrincipal(Principal principal)
+ {
+ _principal = principal;
+ }
+
+ public Principal getPrincipal()
+ {
+ return _principal;
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ _taskList.add(task);
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ _taskList.remove(task);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 2d224d721a..7ed013db48 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -22,24 +22,31 @@ package org.apache.qpid.server.transport;
import org.apache.qpid.transport.*;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.flow.*;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.QueueDeleteOkBody;
import java.util.ArrayList;
import java.util.Map;
-import java.util.HashMap;
public class ServerSessionDelegate extends SessionDelegate
{
private final IApplicationRegistry _appRegistry;
- private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>();
public ServerSessionDelegate(IApplicationRegistry appRegistry)
{
@@ -106,7 +113,7 @@ public class ServerSessionDelegate extends SessionDelegate
Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
- _subscriptions.put(destination, sub);
+ ((ServerSession)session).register(destination, sub);
try
{
queue.registerSubscription(sub, method.getExclusive());
@@ -115,6 +122,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
// TODO
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
}
}
@@ -145,7 +153,9 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (AMQException e)
{
+ // TODO
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
}
@@ -155,120 +165,117 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageCancel(Session session, MessageCancel method)
{
- super.messageCancel(session, method);
- }
-
- @Override
- public void messageFlush(Session session, MessageFlush method)
- {
- super.messageFlush(session, method);
- }
-
- @Override
- public void txSelect(Session session, TxSelect method)
- {
- super.txSelect(session, method);
- }
-
- @Override
- public void txCommit(Session session, TxCommit method)
- {
- super.txCommit(session, method);
- }
-
- @Override
- public void txRollback(Session session, TxRollback method)
- {
- super.txRollback(session, method);
- }
+ String destination = method.getDestination();
- @Override
- public void dtxSelect(Session session, DtxSelect method)
- {
- super.dtxSelect(session, method);
- }
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
- @Override
- public void dtxStart(Session session, DtxStart method)
- {
- super.dtxStart(session, method);
+ if(sub == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
+ else
+ {
+ ((ServerSession)session).unregister(sub);
+ }
}
@Override
- public void dtxEnd(Session session, DtxEnd method)
+ public void messageFlush(Session session, MessageFlush method)
{
- super.dtxEnd(session, method);
- }
+ String destination = method.getDestination();
- @Override
- public void dtxCommit(Session session, DtxCommit method)
- {
- super.dtxCommit(session, method);
- }
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
- @Override
- public void dtxForget(Session session, DtxForget method)
- {
- super.dtxForget(session, method);
- }
+ if(sub == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
+ else
+ {
- @Override
- public void dtxGetTimeout(Session session, DtxGetTimeout method)
- {
- super.dtxGetTimeout(session, method);
+ try
+ {
+ sub.flush();
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
- public void dtxPrepare(Session session, DtxPrepare method)
+ public void txSelect(Session session, TxSelect method)
{
- super.dtxPrepare(session, method);
+ // TODO - check current tx mode
+ ((ServerSession)session).selectTx();
}
@Override
- public void dtxRecover(Session session, DtxRecover method)
+ public void txCommit(Session session, TxCommit method)
{
- super.dtxRecover(session, method);
+ // TODO - check current tx mode
+ ((ServerSession)session).commit();
}
@Override
- public void dtxRollback(Session session, DtxRollback method)
+ public void txRollback(Session session, TxRollback method)
{
- super.dtxRollback(session, method);
+ // TODO - check current tx mode
+ ((ServerSession)session).rollback();
}
- @Override
- public void dtxSetTimeout(Session session, DtxSetTimeout method)
- {
- super.dtxSetTimeout(session, method);
- }
@Override
public void exchangeDeclare(Session session, ExchangeDeclare method)
{
String exchangeName = method.getExchange();
-
+ VirtualHost virtualHost = getVirtualHost(session);
Exchange exchange = getExchange(session, exchangeName);
if(method.getPassive())
{
if(exchange == null)
{
- ExecutionException ex = new ExecutionException();
- ex.setErrorCode(ExecutionErrorCode.NOT_FOUND);
- ex.setCommandId(method.getId());
-
- ex.setDescription("not-found: exchange-name '"+exchangeName+"'");
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'");
- session.invoke(ex);
- session.close();
+ // TODO - control flow
+ return;
}
}
else
{
+ if (!virtualHost.getAccessManager().authoriseCreateExchange((ServerSession)session, method.getAutoDelete(),
+ method.getDurable(), new AMQShortString(method.getExchange()), false, false, method.getPassive(),
+ new AMQShortString(method.getType())))
+ {
+
+ ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
+ String description = "permission denied: exchange-name '" + exchangeName + "'";
+
+ exception(session, method, errorCode, description);
+
+ // TODO - Control Flow
+ return;
+ }
+
+
// TODO
}
- super.exchangeDeclare(session, method);
+ }
+
+ private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
+ {
+ ExecutionException ex = new ExecutionException();
+ ex.setErrorCode(errorCode);
+ ex.setCommandId(method.getId());
+ ex.setDescription(description);
+
+ session.invoke(ex);
+ session.close();
}
private Exchange getExchange(Session session, String exchangeName)
@@ -307,7 +314,6 @@ public class ServerSessionDelegate extends SessionDelegate
public void exchangeQuery(Session session, ExchangeQuery method)
{
super.exchangeQuery(session, method);
-
}
@Override
@@ -326,7 +332,6 @@ public class ServerSessionDelegate extends SessionDelegate
public void exchangeBound(Session session, ExchangeBound method)
{
-
ExchangeBoundResult result = new ExchangeBoundResult();
if(method.hasExchange())
{
@@ -416,19 +421,246 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void queueDeclare(Session session, QueueDeclare method)
{
- super.queueDeclare(session, method);
+
+ VirtualHost virtualHost = getVirtualHost(session);
+
+ String queueName = method.getQueue();
+
+ if (!method.getPassive())
+ {
+ // Perform ACL if request is not passive
+
+ if (!virtualHost.getAccessManager().authoriseCreateQueue(((ServerSession)session), method.getAutoDelete(), method.getDurable(),
+ method.getExclusive(), false, method.getPassive(), new AMQShortString(queueName)))
+ {
+ ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
+ String description = "permission denied: queue-name '" + queueName + "'";
+
+ exception(session, method, errorCode, description);
+
+ // TODO control flow
+ return;
+ }
+ }
+
+
+ AMQQueue queue;
+ QueueRegistry queueRegistry = getQueueRegistry(session);
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+ synchronized (queueRegistry)
+ {
+
+ if (((queue = queueRegistry.getQueue(queueName)) == null))
+ {
+
+ if (method.getPassive())
+ {
+ String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND;
+
+ exception(session, method, errorCode, description);
+
+ return;
+ }
+ else
+ {
+ try
+ {
+ queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
+
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ //store.createQueue(queue, body.getArguments());
+ }
+ queueRegistry.registerQueue(queue);
+ boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
+
+ if (autoRegister)
+ {
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+
+ Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
+
+ queue.bind(defaultExchange, new AMQShortString(queueName), null);
+
+ }
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ else if (queue.getOwner() != null && !((ServerSession)session).getPrincipal().getName().equals(queue.getOwner()))
+ {
+
+ String description = "Cannot declare queue('" + queueName + "'),"
+ + " as exclusive queue with same name "
+ + "declared on another client ID('"
+ + queue.getOwner() + "')";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
+
+ exception(session, method, errorCode, description);
+
+ return;
+ }
+
+ }
+ }
+
+
+ protected AMQQueue createQueue(final String queueName,
+ QueueDeclare body,
+ VirtualHost virtualHost,
+ final ServerSession session)
+ throws AMQException
+ {
+ final QueueRegistry registry = virtualHost.getQueueRegistry();
+
+ String owner = body.getExclusive() ? session.getPrincipal().getName() : null;
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost,
+ body.getArguments());
+
+
+ if (body.getExclusive() && !body.getDurable())
+ {
+ final ServerSession.Task deleteQueueTask =
+ new ServerSession.Task()
+ {
+ public void doTask(ServerSession session)
+ {
+ if (registry.getQueue(queueName) == queue)
+ {
+ try
+ {
+ queue.delete();
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ };
+
+ session.addSessionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ session.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+ }// if exclusive and not durable
+
+ return queue;
}
@Override
public void queueDelete(Session session, QueueDelete method)
{
- super.queueDelete(session, method);
+
+ String queueName = method.getQueue();
+ if(queueName == null || queueName.length()==0)
+ {
+ exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied");
+
+ }
+ else
+ {
+ AMQQueue queue = getQueue(session, queueName);
+
+
+ if (queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found");
+ }
+ else
+ {
+ if (method.getIfEmpty() && !queue.isEmpty())
+ {
+ exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty");
+ }
+ else if (method.getIfUnused() && !queue.isUnused())
+ {
+ // TODO - Error code
+ exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " in use");
+
+ }
+ else
+ {
+ VirtualHost virtualHost = getVirtualHost(session);
+
+ //Perform ACLs
+ if (!virtualHost.getAccessManager().authoriseDelete(((ServerSession)session), queue))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot delete queue " + queueName);
+ }
+ else
+ {
+ try
+ {
+ int purged = queue.delete();
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+
+
+ /* if (queue.isDurable())
+ {
+ store.removeQueue(queue);
+ }*/
+ }
+
+ }
+ }
+ }
+
}
@Override
public void queuePurge(Session session, QueuePurge method)
{
- super.queuePurge(session, method);
+ String queueName = method.getQueue();
+ if(queueName == null || queueName.length()==0)
+ {
+ exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied");
+
+ }
+ else
+ {
+ AMQQueue queue = getQueue(session, queueName);
+
+
+ if (queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "No queue " + queueName + " found");
+ }
+ else
+ {
+ //TODO
+ try
+ {
+ queue.clearQueue(new StoreContext());
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
}
@Override
@@ -438,44 +670,50 @@ public class ServerSessionDelegate extends SessionDelegate
}
@Override
- public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
+ public void messageSetFlowMode(Session session, MessageSetFlowMode sfm)
{
String destination = sfm.getDestination();
- Subscription_0_10 sub = _subscriptions.get(destination);
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
- // TODO null check
+ if(sub == null)
+ {
+ exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
if(sub.isStopped())
{
sub.setFlowMode(sfm.getFlowMode());
}
-
-
-
}
@Override
- public void messageStop(Session ssn, MessageStop stop)
+ public void messageStop(Session session, MessageStop stop)
{
String destination = stop.getDestination();
- Subscription_0_10 sub = _subscriptions.get(destination);
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
- // TODO null check
+ if(sub == null)
+ {
+ exception(session, stop, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
sub.stop();
}
@Override
- public void messageFlow(Session ssn, MessageFlow flow)
+ public void messageFlow(Session session, MessageFlow flow)
{
String destination = flow.getDestination();
- Subscription_0_10 sub = _subscriptions.get(destination);
+ Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
- // TODO null check
+ if(sub == null)
+ {
+ exception(session, flow, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ }
sub.addCredit(flow.getUnit(), flow.getValue());
@@ -485,10 +723,17 @@ public class ServerSessionDelegate extends SessionDelegate
public void closed(Session session)
{
super.closed(session);
- for(Subscription_0_10 sub : _subscriptions.values())
+ for(Subscription_0_10 sub : getSubscriptions(session).values())
{
sub.close();
- ((ServerSession)session).releaseAll();
}
+ ((ServerSession)session).onClose();
+ ((ServerSession)session).onClose();
+ }
+
+ public Map<String, Subscription_0_10> getSubscriptions(Session session)
+ {
+ return ((ServerSession)session).getSubscriptions();
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
new file mode 100755
index 0000000000..78fad3f629
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class AutoCommitTransaction implements Transaction
+{
+
+
+ public void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction)
+ {
+ // store.remove enqueue
+ // store.commit
+ postCommitAction.postCommit();
+ }
+
+ public void enqueue(AMQQueue queue, ServerMessage message, Action postCommitAction)
+ {
+ // store.add enqueue
+ // store.commit
+ postCommitAction.postCommit();
+ }
+
+ public void commit()
+ {
+
+ }
+
+ public void rollback()
+ {
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
new file mode 100755
index 0000000000..92347b9927
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.ServerMessage;
+
+import java.util.List;
+import java.util.ArrayList;
+
+public class LocalTransaction implements Transaction
+{
+ private final List<Action> _postCommitActions = new ArrayList<Action>();
+
+ public void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction)
+ {
+ _postCommitActions.add(postCommitAction);
+ }
+
+ public void enqueue(AMQQueue queue, ServerMessage message, Action postCommitAction)
+ {
+ _postCommitActions.add(postCommitAction);
+ }
+
+ public void commit()
+ {
+ try
+ {
+ for(Action action : _postCommitActions)
+ {
+ action.postCommit();
+ }
+ }
+ finally
+ {
+ _postCommitActions.clear();
+ }
+ }
+
+ public void rollback()
+ {
+
+ try
+ {
+
+ }
+ finally
+ {
+ _postCommitActions.clear();
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
new file mode 100755
index 0000000000..91f429f48e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.ServerMessage;
+
+public interface Transaction
+{
+
+
+ public static interface Action
+ {
+ public void postCommit();
+ }
+
+ void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction);
+
+ void enqueue(AMQQueue queue, ServerMessage message, Action postCommitAction);
+
+ void commit();
+
+ void rollback();
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 99c88fac3e..ad6c95db8e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -186,7 +186,7 @@ public class MockProtocolSession implements AMQProtocolSession
//To change body of implemented methods use File | Settings | File Templates.
}
- public Principal getAuthorizedID()
+ public Principal getPrincipal()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
index f62b0c6241..a964fa931c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
@@ -24,6 +24,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.PrincipalHolder;
public class ExchangeDenier extends AllowAll
{
@@ -42,7 +43,7 @@ public class ExchangeDenier extends AllowAll
};
@Override
- public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+ public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
{
return AuthzResult.DENIED;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
index 5497f0ae44..a415d56601 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.PrincipalHolder;
public class QueueDenier extends AllowAll
{
@@ -48,7 +49,7 @@ public class QueueDenier extends AllowAll
@Override
- public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
{
if (!(queue.getName().toString().equals(_queueName)))
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 43445232d6..08842c94c0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -84,7 +84,8 @@ public class Connection extends ConnectionInvoker
private SaslServer saslServer;
private SaslClient saslClient;
private long idleTimeout = 0;
-
+ private String _authorizationID;
+
// want to make this final
private int _connectionId;
@@ -525,7 +526,17 @@ public class Connection extends ConnectionInvoker
{
return idleTimeout;
}
-
+
+ public void setAuthorizationID(String authorizationID)
+ {
+ _authorizationID = authorizationID;
+ }
+
+ public String getAuthorizationID()
+ {
+ return _authorizationID;
+ }
+
public String toString()
{
return String.format("conn:%x", System.identityHashCode(this));
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 8caf29ecb5..453921ea2b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -131,6 +131,7 @@ public class ServerDelegate extends ConnectionDelegate
(Integer.MAX_VALUE,
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
0, Integer.MAX_VALUE);
+ conn.setAuthorizationID(ss.getAuthorizationID());
}
else
{