diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache')
9 files changed, 160 insertions, 319 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b15b3f0bfa..fe1cb624e5 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -30,6 +31,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; @@ -86,7 +89,9 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder +public class AMQChannel<T extends AMQProtocolSession<T>> + implements AMQSessionModel<AMQChannel<T>,T>, + AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -140,7 +145,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AMQProtocolSession _session; + private final T _session; private AtomicBoolean _closing = new AtomicBoolean(false); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -163,12 +168,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); + private final List<Action<? super AMQChannel<T>>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>(); + private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + public AMQChannel(T session, int channelId, MessageStore messageStore) throws AMQException { _session = session; @@ -526,7 +534,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException + MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, + MessageSource.ConsumerAccessRefused { if (tag == null) { @@ -580,7 +589,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { filterManager = new SimpleFilterManager(); } - filterManager.add(new FilterSupport.NoLocalFilter(source)); + final Object connectionReference = getConnectionReference(); + filterManager.add(new MessageFilter() + { + @Override + public boolean matches(final Filterable message) + { + return message.getConnectionReference() != connectionReference; + } + }); } Consumer sub = source.addConsumer(target, @@ -609,6 +626,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _tag2SubscriptionTargetMap.remove(tag); throw e; } + catch (MessageSource.ConsumerAccessRefused e) + { + _tag2SubscriptionTargetMap.remove(tag); + throw e; + } return tag; } @@ -657,6 +679,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F CurrentActor.get().message(_logSubject, operationalLogMessage); unsubscribeAllConsumers(); + + for (Action<? super AMQChannel<T>> task : _taskList) + { + task.performAction(this); + } + + _transaction.rollback(); try @@ -692,9 +721,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F Consumer sub = me.getValue().getConsumer(); - - sub.close(); - + if(sub != null) + { + sub.close(); + } } _tag2SubscriptionTargetMap.clear(); @@ -1192,7 +1222,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _id; } - public AMQConnectionModel getConnectionModel() + @Override + public T getConnectionModel() { return _session; } @@ -1208,11 +1239,23 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(AMQChannel o) { return getId().compareTo(o.getId()); } + @Override + public void addDeleteTask(final Action<? super AMQChannel<T>> task) + { + _taskList.add(task); + } + + @Override + public void removeDeleteTask(final Action<? super AMQChannel<T>> task) + { + _taskList.remove(task); + } + private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>> { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 5e95701e5a..68f1ad7942 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -47,18 +46,15 @@ import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; -import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogActor; @@ -78,6 +74,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -86,7 +83,7 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession +public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine> { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -103,9 +100,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private VirtualHost _virtualHost; - private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); + private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = + new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); - private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + @SuppressWarnings("unchecked") + private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; /** * The channels that the latest call to {@link #received(ByteBuffer)} applied to. @@ -114,9 +113,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>(); - - private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); + private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage = + new HashSet<AMQChannel<AMQProtocolEngine>>(); private final AMQStateManager _stateManager; @@ -124,10 +122,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private SaslServer _saslServer; - private Object _lastReceived; - - private Object _lastSent; - private volatile boolean _closed; // maximum number of channels this session should have @@ -136,8 +130,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); - private FieldTable _clientProperties; - private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<? super AMQProtocolEngine>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>(); private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); private ProtocolOutputConverter _protocolOutputConverter; @@ -153,12 +147,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _lastIoTime; private long _writtenBytes; - private long _readBytes; - private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); - private long _createTime = System.currentTimeMillis(); private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; @@ -176,6 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private volatile boolean _closeWhenNoRoute; private volatile boolean _stopped; + private long _readBytes; public AMQProtocolEngine(Broker broker, NetworkConnection network, @@ -258,15 +250,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final long arrivalTime = System.currentTimeMillis(); _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; + _readBytes += msg.remaining(); _receivedLock.lock(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - final int len = dataBlocks.size(); - for (int i = 0; i < len; i++) + for (AMQDataBlock dataBlock : dataBlocks) { - AMQDataBlock dataBlock = dataBlocks.get(i); try { dataBlockReceived(dataBlock); @@ -316,7 +307,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void receivedComplete() { - for (AMQChannel channel : _channelsForCurrentMessage) + for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage) { channel.receivedComplete(); } @@ -334,7 +325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ private void dataBlockReceived(AMQDataBlock message) throws Exception { - _lastReceived = message; if (message instanceof ProtocolInitiation) { protocolInitiationReceived((ProtocolInitiation) message); @@ -363,7 +353,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); - AMQChannel amqChannel = _channelMap.get(channelId); + AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId); if(amqChannel != null) { // The _receivedLock is already acquired in the caller @@ -558,14 +548,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - for (AMQMethodListener listener : _frameListeners) - { - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) { throw new AMQNoMethodHandlerException(evt); @@ -611,11 +593,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (Exception e) { - for (AMQMethodListener listener : _frameListeners) - { - listener.error(e); - } - _logger.error("Unexpected exception while processing frame. Closing connection.", e); closeProtocolSession(); @@ -625,7 +602,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); channel.publishContentHeader(body); @@ -633,7 +610,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); channel.publishContentBody(body); } @@ -681,17 +658,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _contextKey = contextKey; } - public List<AMQChannel> getChannels() + public List<AMQChannel<AMQProtocolEngine>> getChannels() { synchronized (_channelMap) { - return new ArrayList<AMQChannel>(_channelMap.values()); + return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values()); } } - public AMQChannel getAndAssertChannel(int channelId) throws AMQException + public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException { - AMQChannel channel = getChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); if (channel == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); @@ -700,9 +677,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return channel; } - public AMQChannel getChannel(int channelId) + public AMQChannel<AMQProtocolEngine> getChannel(int channelId) { - final AMQChannel channel = + final AMQChannel<AMQProtocolEngine> channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { @@ -719,7 +696,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } - public void addChannel(AMQChannel channel) throws AMQException + public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if (_closed) { @@ -770,7 +747,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = value; } - public void commitTransactions(AMQChannel channel) throws AMQException + public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if ((channel != null) && channel.isTransactional()) { @@ -778,7 +755,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public void rollbackTransactions(AMQChannel channel) throws AMQException + public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if ((channel != null) && channel.isTransactional()) { @@ -802,7 +779,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeChannel(int channelId, AMQConstant cause, String message) { - final AMQChannel channel = getChannel(channelId); + final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); if (channel == null) { throw new IllegalArgumentException("Unknown channel id"); @@ -879,12 +856,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /** * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. - * - * @throws AMQException if an error occurs while closing any channel */ private void closeAllChannels() { - for (AMQChannel channel : getChannels()) + for (AMQChannel<AMQProtocolEngine> channel : getChannels()) { channel.close(); } @@ -929,9 +904,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeAllChannels(); - for (Task task : _taskList) + for (Action<? super AMQProtocolEngine> task : _taskList) { - task.doTask(this); + task.performAction(this); } synchronized(this) @@ -961,7 +936,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (InterruptedException e) { - + // do nothing } finally { @@ -1027,11 +1002,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); } - public String dump() - { - return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; - } - /** @return an object that can be used to identity */ public Object getKey() { @@ -1069,10 +1039,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setClientProperties(FieldTable clientProperties) { - _clientProperties = clientProperties; - if (_clientProperties != null) + if (clientProperties != null) { - String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); + String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); if (closeWhenNoRoute != null) { _closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute); @@ -1082,10 +1051,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8); - _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT); + _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8); + _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT); - String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8); + String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8); if (clientId != null) { setContextKey(new AMQShortString(clientId)); @@ -1118,11 +1087,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _protocolVersion.getMinorVersion(); } - public boolean isProtocolVersion(byte major, byte minor) - { - return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); - } - public MethodRegistry getRegistry() { return getMethodRegistry(); @@ -1141,12 +1105,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } - public void addSessionCloseTask(Task task) + public void addDeleteTask(Action<? super AMQProtocolEngine> task) { _taskList.add(task); } - public void removeSessionCloseTask(Task task) + public void removeDeleteTask(Action<? super AMQProtocolEngine> task) { _taskList.remove(task); } @@ -1341,51 +1305,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _clientProduct; } - public String getPrincipalAsString() - { - return getAuthId(); - } - public long getSessionCountLimit() { return getMaximumNumberOfChannels(); } - public Boolean isIncoming() - { - return true; - } - - public Boolean isSystemConnection() - { - return false; - } - - public Boolean isFederationLink() - { - return false; - } - - public String getAuthId() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - - public Integer getRemotePID() - { - return null; - } - - public String getRemoteProcessName() - { - return null; - } - - public Integer getRemoteParentPID() - { - return null; - } - public boolean isDurable() { return false; @@ -1401,52 +1325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getRemoteAddress()); } - public long getCreateTime() - { - return _createTime; - } - - public Boolean isShadow() - { - return false; - } - - public void mgmtClose() - { - MethodRegistry methodRegistry = getMethodRegistry(); - ConnectionCloseBody responseBody = - methodRegistry.createConnectionCloseBody( - AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("The connection was closed using the broker's management interface."), - 0,0); - - // This seems ugly but because we use closeConnection in both normal - // broker operation and as part of the management interface it cannot - // be avoided. The Current Actor will be null when this method is - // called via the QMF management interface. As such we need to set one. - boolean removeActor = false; - if (CurrentActor.get() == null) - { - removeActor = true; - CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger())); - } - - try - { - writeFrame(responseBody.generateFrame(0)); - - closeSession(); - - } - finally - { - if (removeActor) - { - CurrentActor.remove(); - } - } - } - public void mgmtCloseChannel(int channelId) { MethodRegistry methodRegistry = getMethodRegistry(); @@ -1481,14 +1359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public String getClientID() - { - return getContextKey().toString(); - } - - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message) { - int channelId = ((AMQChannel)session).getChannelId(); + int channelId = session.getChannelId(); closeChannel(channelId, cause, message); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1506,7 +1379,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getProtocolOutputConverter().getProtocolMajorVersion(), getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null)); + null)); } public void block() @@ -1516,7 +1389,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(!_blocking) { _blocking = true; - for(AMQChannel channel : _channelMap.values()) + for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) { channel.block(); } @@ -1531,7 +1404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(_blocking) { _blocking = false; - for(AMQChannel channel : _channelMap.values()) + for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) { channel.unblock(); } @@ -1544,9 +1417,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _closed; } - public List<AMQSessionModel> getSessionModels() + public List<AMQChannel<AMQProtocolEngine>> getSessionModels() { - return new ArrayList<AMQSessionModel>(getChannels()); + return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels()); } public LogSubject getLogSubject() @@ -1620,14 +1493,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getContextKey()); } - public void setDeferFlush(boolean deferFlush) + @Override + public String getRemoteContainerName() { - _deferFlush = deferFlush; + return String.valueOf(getContextKey()); } - public String getUserName() + public void setDeferFlush(boolean deferFlush) { - return getAuthorizedPrincipal().getName(); + _deferFlush = deferFlush; } public final class WriteDeliverMethod diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 58a3b5df12..045367b667 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -37,12 +37,14 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.virtualhost.VirtualHost; -public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel +public interface AMQProtocolSession<T extends AMQProtocolSession<T>> + extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel<T,AMQChannel<T>> { long getSessionID(); @@ -69,11 +71,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth */ SocketAddress getLocalAddress(); - public static interface Task - { - public void doTask(AMQProtocolSession session); - } - /** * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC * 6). @@ -98,7 +95,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth * * @return null if no channel exists, the channel otherwise */ - AMQChannel getChannel(int channelId); + AMQChannel<T> getChannel(int channelId); /** * Associate a channel with this session. @@ -106,7 +103,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth * @param channel the channel to associate with this session. It is an error to associate the same channel with more * than one session but this is not validated. */ - void addChannel(AMQChannel channel) throws AMQException; + void addChannel(AMQChannel<T> channel) throws AMQException; /** * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue @@ -185,10 +182,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void setVirtualHost(VirtualHost virtualHost) throws AMQException; - void addSessionCloseTask(Task task); - - void removeSessionCloseTask(Task task); - public ProtocolOutputConverter getProtocolOutputConverter(); void setAuthorizedSubject(Subject authorizedSubject); @@ -209,11 +202,11 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void setMaximumNumberOfChannels(Long value); - void commitTransactions(AMQChannel channel) throws AMQException; + void commitTransactions(AMQChannel<T> channel) throws AMQException; - void rollbackTransactions(AMQChannel channel) throws AMQException; + void rollbackTransactions(AMQChannel<T> channel) throws AMQException; - List<AMQChannel> getChannels(); + List<AMQChannel<T>> getChannels(); void mgmtCloseChannel(int channelId); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index ce90de7aac..c93c164978 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -99,16 +99,6 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { final AMQShortString consumerTagName; - if (queue.isExclusive() && !queue.isDurable()) - { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (session == null || session.getConnectionModel() != protocolConnection) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); - } - } - if (body.getConsumerTag() != null) { consumerTagName = body.getConsumerTag().intern(false); @@ -184,6 +174,13 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic + queue.getName() + " permission denied"); } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + throw body.getChannelException(AMQConstant.ACCESS_REFUSED, + "Cannot subscribe to queue " + + queue.getName() + + " as it already has an incompatible exclusivity policy"); + } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index c9a7cc69a1..43700049e1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -98,15 +98,6 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } else { - if (queue.isExclusive()) - { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (session == null || session.getConnectionModel() != protocolConnection) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue is exclusive, but not created on this Connection."); - } - } try { @@ -136,6 +127,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB "The GET request has been evaluated as an exclusive consumer, " + "this is likely due to a programming error in the Qpid broker"); } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue has an incompatible exclusivit policy"); + } } } } @@ -145,7 +141,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final AMQChannel channel, final boolean acks) throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer + MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused { final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 401718db88..9b875ccf39 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -116,15 +116,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> try { - if (queue.isExclusive() && !queue.isDurable()) - { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (session == null || session.getConnectionModel() != protocolConnection) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); - } - } Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments()); String bindingKey = String.valueOf(routingKey); @@ -144,10 +135,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } } } - catch (AMQException e) - { - throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); - } catch (QpidSecurityException e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 5b5525643c..215e3f2f23 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -29,6 +29,9 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -39,7 +42,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; @@ -96,9 +98,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else { - AMQSessionModel owningSession = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() - && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); @@ -114,42 +114,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar try { - queue = createQueue(queueName, body, virtualHost, protocolConnection); - queue.setAuthorizationHolder(protocolConnection); - - if (body.getExclusive()) - { - queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); - queue.setAuthorizationHolder(protocolConnection); - - if(!body.getDurable()) - { - final AMQQueue q = queue; - final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) - { - q.setExclusiveOwningSession(null); - } - }; - protocolConnection.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void performAction(AMQQueue queue) - { - protocolConnection.removeSessionCloseTask(sessionCloseTask); - } - }); - } - } + queue = createQueue(channel, queueName, body, virtualHost, protocolConnection); } catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - AMQSessionModel owningSession = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); @@ -161,19 +134,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " + queue.isExclusive() + " requested " + body.getExclusive() + ")"); } - else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection))) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), " - + "as exclusive queue with same name " - + "declared on another client ID('" - + queue.getOwner() + "') your clientID('" + session.getClientID() + "')"); - - } - else if(queue.isAutoDelete() != body.getAutoDelete()) + else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) + || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: " - + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); + "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: " + + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")"); } else if(queue.isDurable() != body.getDurable()) { @@ -211,7 +177,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return new AMQShortString("tmp_" + UUID.randomUUID()); } - protected AMQQueue createQueue(final AMQShortString queueName, + protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName, QueueDeclareBody body, final VirtualHost virtualHost, final AMQProtocolSession session) @@ -222,48 +188,36 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final boolean autoDelete = body.getAutoDelete(); final boolean exclusive = body.getExclusive(); - String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null; - Map<String, Object> arguments = + Map<String, Object> attributes = QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); - String queueNameString = AMQShortString.toString(queueName); - final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()); + final String queueNameString = AMQShortString.toString(queueName); + attributes.put(Queue.NAME, queueNameString); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName())); + attributes.put(Queue.DURABLE, durable); - final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete, - exclusive, autoDelete, arguments); + LifetimePolicy lifetimePolicy; + ExclusivityPolicy exclusivityPolicy; - if (exclusive && !durable) + if(exclusive) { - final AMQProtocolSession.Task deleteQueueTask = - new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) - { - if (virtualHost.getQueue(queueName.toString()) == queue) - { - try - { - virtualHost.removeQueue(queue); - } - catch (QpidSecurityException e) - { - throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e); - } - } - } - }; - - session.addSessionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - session.removeSessionCloseTask(deleteQueueTask); - } - }); + lifetimePolicy = autoDelete + ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; + exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; + } + else + { + lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; + exclusivityPolicy = ExclusivityPolicy.NONE; } + attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); + attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); + + + final AMQQueue queue = virtualHost.createQueue(channel, attributes); + return queue; } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java index 3a9a6dc44e..c939e49aab 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java @@ -105,8 +105,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB } else { - AMQSessionModel session = queue.getExclusiveOwningSession(); - if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java index 6d8f8e64fc..9d035a3f57 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java @@ -96,9 +96,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod } else { - AMQSessionModel session = queue.getExclusiveOwningSession(); - - if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection)) + if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue is exclusive, but not created on this Connection."); |