summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java63
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java226
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java23
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java17
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java16
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java13
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java114
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java4
9 files changed, 160 insertions, 319 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b15b3f0bfa..fe1cb624e5 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -30,6 +31,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
@@ -86,7 +89,9 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel<T extends AMQProtocolSession<T>>
+ implements AMQSessionModel<AMQChannel<T>,T>,
+ AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -140,7 +145,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AMQProtocolSession _session;
+ private final T _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -163,12 +168,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
+ private final List<Action<? super AMQChannel<T>>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
private final ImmediateAction _immediateAction = new ImmediateAction();
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ public AMQChannel(T session, int channelId, MessageStore messageStore)
throws AMQException
{
_session = session;
@@ -526,7 +534,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException
+ MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
+ MessageSource.ConsumerAccessRefused
{
if (tag == null)
{
@@ -580,7 +589,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
filterManager = new SimpleFilterManager();
}
- filterManager.add(new FilterSupport.NoLocalFilter(source));
+ final Object connectionReference = getConnectionReference();
+ filterManager.add(new MessageFilter()
+ {
+ @Override
+ public boolean matches(final Filterable message)
+ {
+ return message.getConnectionReference() != connectionReference;
+ }
+ });
}
Consumer sub =
source.addConsumer(target,
@@ -609,6 +626,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_tag2SubscriptionTargetMap.remove(tag);
throw e;
}
+ catch (MessageSource.ConsumerAccessRefused e)
+ {
+ _tag2SubscriptionTargetMap.remove(tag);
+ throw e;
+ }
return tag;
}
@@ -657,6 +679,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
CurrentActor.get().message(_logSubject, operationalLogMessage);
unsubscribeAllConsumers();
+
+ for (Action<? super AMQChannel<T>> task : _taskList)
+ {
+ task.performAction(this);
+ }
+
+
_transaction.rollback();
try
@@ -692,9 +721,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
Consumer sub = me.getValue().getConsumer();
-
- sub.close();
-
+ if(sub != null)
+ {
+ sub.close();
+ }
}
_tag2SubscriptionTargetMap.clear();
@@ -1192,7 +1222,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
return _id;
}
- public AMQConnectionModel getConnectionModel()
+ @Override
+ public T getConnectionModel()
{
return _session;
}
@@ -1208,11 +1239,23 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
@Override
- public int compareTo(AMQSessionModel o)
+ public int compareTo(AMQChannel o)
{
return getId().compareTo(o.getId());
}
+ @Override
+ public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+ {
+ _taskList.add(task);
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+ {
+ _taskList.remove(task);
+ }
+
private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 5e95701e5a..68f1ad7942 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -33,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -47,18 +46,15 @@ import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
-import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogActor;
@@ -78,6 +74,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -86,7 +83,7 @@ import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -103,9 +100,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private VirtualHost _virtualHost;
- private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
+ new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
- private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ @SuppressWarnings("unchecked")
+ private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
/**
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -114,9 +113,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*
* Thread-safety: guarded by {@link #_receivedLock}.
*/
- private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>();
-
- private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
+ private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
+ new HashSet<AMQChannel<AMQProtocolEngine>>();
private final AMQStateManager _stateManager;
@@ -124,10 +122,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private SaslServer _saslServer;
- private Object _lastReceived;
-
- private Object _lastSent;
-
private volatile boolean _closed;
// maximum number of channels this session should have
@@ -136,8 +130,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
- private FieldTable _clientProperties;
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<? super AMQProtocolEngine>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
@@ -153,12 +147,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private long _lastIoTime;
private long _writtenBytes;
- private long _readBytes;
-
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
- private long _createTime = System.currentTimeMillis();
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@@ -176,6 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private volatile boolean _closeWhenNoRoute;
private volatile boolean _stopped;
+ private long _readBytes;
public AMQProtocolEngine(Broker broker,
NetworkConnection network,
@@ -258,15 +250,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
final long arrivalTime = System.currentTimeMillis();
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
+ _readBytes += msg.remaining();
_receivedLock.lock();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- final int len = dataBlocks.size();
- for (int i = 0; i < len; i++)
+ for (AMQDataBlock dataBlock : dataBlocks)
{
- AMQDataBlock dataBlock = dataBlocks.get(i);
try
{
dataBlockReceived(dataBlock);
@@ -316,7 +307,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private void receivedComplete()
{
- for (AMQChannel channel : _channelsForCurrentMessage)
+ for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
{
channel.receivedComplete();
}
@@ -334,7 +325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*/
private void dataBlockReceived(AMQDataBlock message) throws Exception
{
- _lastReceived = message;
if (message instanceof ProtocolInitiation)
{
protocolInitiationReceived((ProtocolInitiation) message);
@@ -363,7 +353,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
- AMQChannel amqChannel = _channelMap.get(channelId);
+ AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
if(amqChannel != null)
{
// The _receivedLock is already acquired in the caller
@@ -558,14 +548,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- for (AMQMethodListener listener : _frameListeners)
- {
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
-
if (!wasAnyoneInterested)
{
throw new AMQNoMethodHandlerException(evt);
@@ -611,11 +593,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
catch (Exception e)
{
- for (AMQMethodListener listener : _frameListeners)
- {
- listener.error(e);
- }
-
_logger.error("Unexpected exception while processing frame. Closing connection.", e);
closeProtocolSession();
@@ -625,7 +602,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
- AMQChannel channel = getAndAssertChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
channel.publishContentHeader(body);
@@ -633,7 +610,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
- AMQChannel channel = getAndAssertChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
channel.publishContentBody(body);
}
@@ -681,17 +658,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_contextKey = contextKey;
}
- public List<AMQChannel> getChannels()
+ public List<AMQChannel<AMQProtocolEngine>> getChannels()
{
synchronized (_channelMap)
{
- return new ArrayList<AMQChannel>(_channelMap.values());
+ return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
}
}
- public AMQChannel getAndAssertChannel(int channelId) throws AMQException
+ public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
{
- AMQChannel channel = getChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
if (channel == null)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
@@ -700,9 +677,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return channel;
}
- public AMQChannel getChannel(int channelId)
+ public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
{
- final AMQChannel channel =
+ final AMQChannel<AMQProtocolEngine> channel =
((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
@@ -719,7 +696,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
- public void addChannel(AMQChannel channel) throws AMQException
+ public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if (_closed)
{
@@ -770,7 +747,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_maxNoOfChannels = value;
}
- public void commitTransactions(AMQChannel channel) throws AMQException
+ public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
{
@@ -778,7 +755,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- public void rollbackTransactions(AMQChannel channel) throws AMQException
+ public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
{
@@ -802,7 +779,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void closeChannel(int channelId, AMQConstant cause, String message)
{
- final AMQChannel channel = getChannel(channelId);
+ final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
if (channel == null)
{
throw new IllegalArgumentException("Unknown channel id");
@@ -879,12 +856,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
/**
* Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
- *
- * @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels()
{
- for (AMQChannel channel : getChannels())
+ for (AMQChannel<AMQProtocolEngine> channel : getChannels())
{
channel.close();
}
@@ -929,9 +904,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeAllChannels();
- for (Task task : _taskList)
+ for (Action<? super AMQProtocolEngine> task : _taskList)
{
- task.doTask(this);
+ task.performAction(this);
}
synchronized(this)
@@ -961,7 +936,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
catch (InterruptedException e)
{
-
+ // do nothing
}
finally
{
@@ -1027,11 +1002,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
}
- public String dump()
- {
- return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
- }
-
/** @return an object that can be used to identity */
public Object getKey()
{
@@ -1069,10 +1039,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void setClientProperties(FieldTable clientProperties)
{
- _clientProperties = clientProperties;
- if (_clientProperties != null)
+ if (clientProperties != null)
{
- String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+ String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
if (closeWhenNoRoute != null)
{
_closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute);
@@ -1082,10 +1051,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
- _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT);
+ _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
+ _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT);
- String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
+ String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
if (clientId != null)
{
setContextKey(new AMQShortString(clientId));
@@ -1118,11 +1087,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _protocolVersion.getMinorVersion();
}
- public boolean isProtocolVersion(byte major, byte minor)
- {
- return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
- }
-
public MethodRegistry getRegistry()
{
return getMethodRegistry();
@@ -1141,12 +1105,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
- public void addSessionCloseTask(Task task)
+ public void addDeleteTask(Action<? super AMQProtocolEngine> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Task task)
+ public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
{
_taskList.remove(task);
}
@@ -1341,51 +1305,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _clientProduct;
}
- public String getPrincipalAsString()
- {
- return getAuthId();
- }
-
public long getSessionCountLimit()
{
return getMaximumNumberOfChannels();
}
- public Boolean isIncoming()
- {
- return true;
- }
-
- public Boolean isSystemConnection()
- {
- return false;
- }
-
- public Boolean isFederationLink()
- {
- return false;
- }
-
- public String getAuthId()
- {
- return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
- }
-
- public Integer getRemotePID()
- {
- return null;
- }
-
- public String getRemoteProcessName()
- {
- return null;
- }
-
- public Integer getRemoteParentPID()
- {
- return null;
- }
-
public boolean isDurable()
{
return false;
@@ -1401,52 +1325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return String.valueOf(getRemoteAddress());
}
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public Boolean isShadow()
- {
- return false;
- }
-
- public void mgmtClose()
- {
- MethodRegistry methodRegistry = getMethodRegistry();
- ConnectionCloseBody responseBody =
- methodRegistry.createConnectionCloseBody(
- AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("The connection was closed using the broker's management interface."),
- 0,0);
-
- // This seems ugly but because we use closeConnection in both normal
- // broker operation and as part of the management interface it cannot
- // be avoided. The Current Actor will be null when this method is
- // called via the QMF management interface. As such we need to set one.
- boolean removeActor = false;
- if (CurrentActor.get() == null)
- {
- removeActor = true;
- CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
- }
-
- try
- {
- writeFrame(responseBody.generateFrame(0));
-
- closeSession();
-
- }
- finally
- {
- if (removeActor)
- {
- CurrentActor.remove();
- }
- }
- }
-
public void mgmtCloseChannel(int channelId)
{
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1481,14 +1359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- public String getClientID()
- {
- return getContextKey().toString();
- }
-
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
{
- int channelId = ((AMQChannel)session).getChannelId();
+ int channelId = session.getChannelId();
closeChannel(channelId, cause, message);
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1506,7 +1379,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
getProtocolOutputConverter().getProtocolMajorVersion(),
getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null));
+ null));
}
public void block()
@@ -1516,7 +1389,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
if(!_blocking)
{
_blocking = true;
- for(AMQChannel channel : _channelMap.values())
+ for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
{
channel.block();
}
@@ -1531,7 +1404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
if(_blocking)
{
_blocking = false;
- for(AMQChannel channel : _channelMap.values())
+ for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
{
channel.unblock();
}
@@ -1544,9 +1417,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _closed;
}
- public List<AMQSessionModel> getSessionModels()
+ public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
{
- return new ArrayList<AMQSessionModel>(getChannels());
+ return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
}
public LogSubject getLogSubject()
@@ -1620,14 +1493,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return String.valueOf(getContextKey());
}
- public void setDeferFlush(boolean deferFlush)
+ @Override
+ public String getRemoteContainerName()
{
- _deferFlush = deferFlush;
+ return String.valueOf(getContextKey());
}
- public String getUserName()
+ public void setDeferFlush(boolean deferFlush)
{
- return getAuthorizedPrincipal().getName();
+ _deferFlush = deferFlush;
}
public final class WriteDeliverMethod
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index 58a3b5df12..045367b667 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -37,12 +37,14 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
+public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
+ extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel<T,AMQChannel<T>>
{
long getSessionID();
@@ -69,11 +71,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
*/
SocketAddress getLocalAddress();
- public static interface Task
- {
- public void doTask(AMQProtocolSession session);
- }
-
/**
* Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
* 6).
@@ -98,7 +95,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
*
* @return null if no channel exists, the channel otherwise
*/
- AMQChannel getChannel(int channelId);
+ AMQChannel<T> getChannel(int channelId);
/**
* Associate a channel with this session.
@@ -106,7 +103,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
* @param channel the channel to associate with this session. It is an error to associate the same channel with more
* than one session but this is not validated.
*/
- void addChannel(AMQChannel channel) throws AMQException;
+ void addChannel(AMQChannel<T> channel) throws AMQException;
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
@@ -185,10 +182,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
void setVirtualHost(VirtualHost virtualHost) throws AMQException;
- void addSessionCloseTask(Task task);
-
- void removeSessionCloseTask(Task task);
-
public ProtocolOutputConverter getProtocolOutputConverter();
void setAuthorizedSubject(Subject authorizedSubject);
@@ -209,11 +202,11 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
void setMaximumNumberOfChannels(Long value);
- void commitTransactions(AMQChannel channel) throws AMQException;
+ void commitTransactions(AMQChannel<T> channel) throws AMQException;
- void rollbackTransactions(AMQChannel channel) throws AMQException;
+ void rollbackTransactions(AMQChannel<T> channel) throws AMQException;
- List<AMQChannel> getChannels();
+ List<AMQChannel<T>> getChannels();
void mgmtCloseChannel(int channelId);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index ce90de7aac..c93c164978 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -99,16 +99,6 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
final AMQShortString consumerTagName;
- if (queue.isExclusive() && !queue.isDurable())
- {
- AMQSessionModel session = queue.getExclusiveOwningSession();
- if (session == null || session.getConnectionModel() != protocolConnection)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
- }
- }
-
if (body.getConsumerTag() != null)
{
consumerTagName = body.getConsumerTag().intern(false);
@@ -184,6 +174,13 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
+ queue.getName()
+ " permission denied");
}
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an incompatible exclusivity policy");
+ }
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index c9a7cc69a1..43700049e1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -98,15 +98,6 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
- if (queue.isExclusive())
- {
- AMQSessionModel session = queue.getExclusiveOwningSession();
- if (session == null || session.getConnectionModel() != protocolConnection)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection.");
- }
- }
try
{
@@ -136,6 +127,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
"The GET request has been evaluated as an exclusive consumer, " +
"this is likely due to a programming error in the Qpid broker");
}
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an incompatible exclusivit policy");
+ }
}
}
}
@@ -145,7 +141,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final AMQChannel channel,
final boolean acks)
throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer
+ MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
{
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
index 401718db88..9b875ccf39 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
@@ -116,15 +116,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
try
{
- if (queue.isExclusive() && !queue.isDurable())
- {
- AMQSessionModel session = queue.getExclusiveOwningSession();
- if (session == null || session.getConnectionModel() != protocolConnection)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
- }
- }
Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
String bindingKey = String.valueOf(routingKey);
@@ -144,10 +135,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
}
}
- catch (AMQException e)
- {
- throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
- }
catch (QpidSecurityException e)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 5b5525643c..215e3f2f23 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -29,6 +29,9 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -39,7 +42,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -96,9 +98,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else
{
- AMQSessionModel owningSession = queue.getExclusiveOwningSession();
- if (queue.isExclusive() && !queue.isDurable()
- && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -114,42 +114,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
try
{
- queue = createQueue(queueName, body, virtualHost, protocolConnection);
- queue.setAuthorizationHolder(protocolConnection);
-
- if (body.getExclusive())
- {
- queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
- queue.setAuthorizationHolder(protocolConnection);
-
- if(!body.getDurable())
- {
- final AMQQueue q = queue;
- final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
- {
- public void doTask(AMQProtocolSession session)
- {
- q.setExclusiveOwningSession(null);
- }
- };
- protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new Action<AMQQueue>() {
- public void performAction(AMQQueue queue)
- {
- protocolConnection.removeSessionCloseTask(sessionCloseTask);
- }
- });
- }
- }
+ queue = createQueue(channel, queueName, body, virtualHost, protocolConnection);
}
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- AMQSessionModel owningSession = queue.getExclusiveOwningSession();
- if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -161,19 +134,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
"Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
+ queue.isExclusive() + " requested " + body.getExclusive() + ")");
}
- else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
- + "as exclusive queue with same name "
- + "declared on another client ID('"
- + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
-
- }
- else if(queue.isAutoDelete() != body.getAutoDelete())
+ else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: "
- + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+ "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")");
}
else if(queue.isDurable() != body.getDurable())
{
@@ -211,7 +177,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
return new AMQShortString("tmp_" + UUID.randomUUID());
}
- protected AMQQueue createQueue(final AMQShortString queueName,
+ protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
QueueDeclareBody body,
final VirtualHost virtualHost,
final AMQProtocolSession session)
@@ -222,48 +188,36 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
final boolean autoDelete = body.getAutoDelete();
final boolean exclusive = body.getExclusive();
- String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null;
- Map<String, Object> arguments =
+ Map<String, Object> attributes =
QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
- String queueNameString = AMQShortString.toString(queueName);
- final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName());
+ final String queueNameString = AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()));
+ attributes.put(Queue.DURABLE, durable);
- final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete,
- exclusive, autoDelete, arguments);
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
- if (exclusive && !durable)
+ if(exclusive)
{
- final AMQProtocolSession.Task deleteQueueTask =
- new AMQProtocolSession.Task()
- {
- public void doTask(AMQProtocolSession session)
- {
- if (virtualHost.getQueue(queueName.toString()) == queue)
- {
- try
- {
- virtualHost.removeQueue(queue);
- }
- catch (QpidSecurityException e)
- {
- throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e);
- }
- }
- }
- };
-
- session.addSessionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- session.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+ }
+ else
+ {
+ lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
}
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+ final AMQQueue queue = virtualHost.createQueue(channel, attributes);
+
return queue;
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
index 3a9a6dc44e..c939e49aab 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
@@ -105,8 +105,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
}
else
{
- AMQSessionModel session = queue.getExclusiveOwningSession();
- if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
index 6d8f8e64fc..9d035a3f57 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
@@ -96,9 +96,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
- AMQSessionModel session = queue.getExclusiveOwningSession();
-
- if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue is exclusive, but not created on this Connection.");