From e8d4cdead60d7247dea47b4b241255c4d6dfc094 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 7 May 2010 15:09:42 +0000 Subject: QPID-2575 : Create Connection and Session models to correctly expose the Owning Session. Addressed issue where getPrincipal was used in error to identify queue owner. Session model now allows access to this in a protocol independent way. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@942101 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 14 +++++- .../server/handler/BasicConsumeMethodHandler.java | 31 +++++++------ .../qpid/server/handler/BasicGetMethodHandler.java | 25 ++++++---- .../qpid/server/handler/QueueBindHandler.java | 23 +++++---- .../qpid/server/handler/QueueDeclareHandler.java | 54 +++++++++++++--------- .../qpid/server/handler/QueueDeleteHandler.java | 16 ++++--- .../qpid/server/handler/QueuePurgeHandler.java | 20 ++++---- .../qpid/server/protocol/AMQConnectionModel.java | 38 +++++++++++++++ .../qpid/server/protocol/AMQProtocolEngine.java | 19 +++++++- .../qpid/server/protocol/AMQProtocolSession.java | 4 +- .../qpid/server/protocol/AMQSessionModel.java | 28 +++++++++++ .../org/apache/qpid/server/queue/AMQQueue.java | 6 ++- .../apache/qpid/server/queue/SimpleAMQQueue.java | 8 ++-- .../qpid/server/transport/ServerConnection.java | 26 ++++++++++- .../qpid/server/transport/ServerSession.java | 17 ++++++- .../server/transport/ServerSessionDelegate.java | 3 +- .../org/apache/qpid/server/queue/MockAMQQueue.java | 7 +-- 17 files changed, 251 insertions(+), 88 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 454b731e5f..573fa9d966 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -56,6 +56,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.IncomingMessage; @@ -86,7 +88,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -public class AMQChannel implements SessionConfig +public class AMQChannel implements SessionConfig, AMQSessionModel { public static final int DEFAULT_PREFETCH = 5000; @@ -1058,6 +1060,16 @@ public class AMQChannel implements SessionConfig } + public Object getID() + { + return _channelId; + } + + public AMQConnectionModel getConnectionModel() + { + return _session; + } + private class MessageDeliveryAction implements ServerTransaction.Action { private IncomingMessage _incommingMessage; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index d806f9426a..50019090d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -48,14 +49,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); + AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); + VirtualHost virtualHost = protocolConnection.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); @@ -70,7 +71,7 @@ public class QueueBindHandler implements StateAwareMethodListener if (body.getQueue() == null) { - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) { @@ -114,15 +115,19 @@ public class QueueBindHandler implements StateAwareMethodListener { //Perform ACLs - if (!virtualHost.getAccessManager().authoriseBind(session, exch, + if (!virtualHost.getAccessManager().authoriseBind(protocolConnection, exch, queue, routingKey)) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } - else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + else if (queue.isExclusive() && !queue.isDurable()) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); + AMQSessionModel session = queue.getExclusiveOwningSession(); + if (session == null || session.getConnectionModel() != protocolConnection) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); + } } if (!exch.isBound(routingKey, body.getArguments(), queue)) @@ -153,9 +158,9 @@ public class QueueBindHandler implements StateAwareMethodListener } if (!body.getNowait()) { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + protocolConnection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 5d5bd761c7..961a165877 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; @@ -61,8 +62,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener, ExchangeRefer PrincipalHolder getPrincipalHolder(); void setPrincipalHolder(PrincipalHolder principalHolder); - void setExclusiveOwner(Object owner); - Object getExclusiveOwner(); + void setExclusiveOwningSession(AMQSessionModel owner); + AMQSessionModel getExclusiveOwningSession(); VirtualHost getVirtualHost(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index cf2f637697..b7f3f59c4b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -7,6 +7,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -83,7 +85,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private PrincipalHolder _prinicpalHolder; - private Object _exclusiveOwner; + private AMQSessionModel _exclusiveOwner; private final boolean _durable; @@ -2045,12 +2047,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return ids; } - public Object getExclusiveOwner() + public AMQSessionModel getExclusiveOwningSession() { return _exclusiveOwner; } - public void setExclusiveOwner(Object exclusiveOwner) + public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) { _exclusiveOwner = exclusiveOwner; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 1aff1eec86..58dbc95224 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -22,10 +22,23 @@ package org.apache.qpid.server.transport; import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ConnectionCloseCode; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionDetachCode; +import org.apache.qpid.transport.SessionDetach; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.AMQException; -public class ServerConnection extends Connection +public class ServerConnection extends Connection implements AMQConnectionModel { private ConnectionConfig _config; private Runnable _onOpenTask; @@ -88,4 +101,15 @@ public class ServerConnection extends Connection { _onOpenTask = task; } + + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + { + ExecutionException ex = new ExecutionException(); + ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED); + ex.setDescription(message); + ((ServerSession)session).invoke(ex); + + ((ServerSession)session).close(); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 2195cc4154..52b253c075 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -39,6 +39,8 @@ import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageTransfer; @@ -63,7 +65,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -public class ServerSession extends Session implements PrincipalHolder, SessionConfig +public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel { private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); @@ -310,7 +312,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } } - public void removeDispositionListener(Method method) + public void removeDispositionListener(Method method) { _messageDispositionListenerMap.remove(method.getId()); } @@ -552,4 +554,15 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo { close(); } + + public Object getID() + { + return getName(); + } + + public AMQConnectionModel getConnectionModel() + { + return (ServerConnection) getConnection(); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 3b0f990377..7dcb268290 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.flow.*; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.*; @@ -813,7 +814,7 @@ public class ServerSessionDelegate extends SessionDelegate if(method.getExclusive()) { queue.setPrincipalHolder((ServerSession)session); - queue.setExclusiveOwner(session); + queue.setExclusiveOwningSession((AMQSessionModel) session); } else if(method.getAutoDelete()) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index dbd51af68c..f02b1f435f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.AMQException; @@ -47,7 +48,7 @@ public class MockAMQQueue implements AMQQueue private PrincipalHolder _principalHolder; - private Object _exclusiveOwner; + private AMQSessionModel _exclusiveOwner; public MockAMQQueue(String name) { @@ -527,12 +528,12 @@ public class MockAMQQueue implements AMQQueue _principalHolder = principalHolder; } - public Object getExclusiveOwner() + public AMQSessionModel getExclusiveOwningSession() { return _exclusiveOwner; } - public void setExclusiveOwner(Object exclusiveOwner) + public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) { _exclusiveOwner = exclusiveOwner; } -- cgit v1.2.1