diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid')
13 files changed, 2166 insertions, 2077 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 7574cc3533..b4f276a45a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -105,8 +105,8 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.TransportException; -public class AMQChannel<T extends AMQProtocolSession<T>> - implements AMQSessionModel<AMQChannel<T>,T>, +public class AMQChannel + implements AMQSessionModel<AMQChannel, AMQProtocolEngine>, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -158,7 +158,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final T _session; + private final AMQProtocolEngine _connection; private AtomicBoolean _closing = new AtomicBoolean(false); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -180,8 +180,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); - private final List<Action<? super AMQChannel<T>>> _taskList = - new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>(); + private final List<Action<? super AMQChannel>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQChannel>>(); private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); @@ -191,17 +191,18 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; + private ChannelMethodProcessor _channelMethodProcessor; - public AMQChannel(T session, int channelId, final MessageStore messageStore) + public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) throws AMQException { - _session = session; + _connection = connection; _channelId = channelId; - _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(), - session.getAuthorizedSubject().getPublicCredentials(), - session.getAuthorizedSubject().getPrivateCredentials()); + _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(), + connection.getAuthorizedSubject().getPublicCredentials(), + connection.getAuthorizedSubject().getPrivateCredentials()); _subject.getPrincipals().add(new SessionPrincipal(this)); _logSubject = new ChannelLogSubject(this); @@ -210,7 +211,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // by default the session is non-transactional _transaction = new AsyncAutoCommitTransaction(_messageStore, this); - _clientDeliveryMethod = session.createDeliveryMethod(_channelId); + _clientDeliveryMethod = connection.createDeliveryMethod(_channelId); _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() { @@ -238,6 +239,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> return null; } }); + _channelMethodProcessor = new ChannelMethodProcessorImpl(this); } @@ -249,7 +251,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> @Override public long getActivityTime() { - return _session.getLastReceivedTime(); + return _connection.getLastReceivedTime(); } }); _txnStarts.incrementAndGet(); @@ -354,7 +356,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> final MessageMetaData messageMetaData = new MessageMetaData(_currentMessage.getMessagePublishInfo(), _currentMessage.getContentHeader(), - getProtocolSession().getLastReceivedTime()); + getConnection().getLastReceivedTime()); final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle); @@ -429,7 +431,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { long bodySize = _currentMessage.getSize(); long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp(); - _session.registerMessageReceived(bodySize, timestamp); + _connection.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } } @@ -442,13 +444,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * Pre-requisite: the current message is judged to have no destination queues. * * @throws AMQConnectionException if the message is mandatory close-on-no-route - * @see AMQProtocolSession#isCloseWhenNoRoute() + * @see AMQProtocolEngine#isCloseWhenNoRoute() */ private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException { boolean mandatory = message.isMandatory(); String description = currentMessageDescription(); - boolean closeOnNoRoute = _session.isCloseWhenNoRoute(); + boolean closeOnNoRoute = _connection.isCloseWhenNoRoute(); if(_logger.isDebugEnabled()) { @@ -457,13 +459,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> description, mandatory, isTransactional(), closeOnNoRoute)); } - if (mandatory && isTransactional() && _session.isCloseWhenNoRoute()) + if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute()) { throw new AMQConnectionException( AMQConstant.NO_ROUTE, "No route for message " + currentMessageDescription(), 0, 0, // default class and method ids - getProtocolSession().getMethodRegistry(), + getConnection().getMethodRegistry(), (Throwable) null); } @@ -564,9 +566,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>> */ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) - throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, - MessageSource.ConsumerAccessRefused + throws MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer, + AMQInvalidArgumentException, + MessageSource.ConsumerAccessRefused, ConsumerTagInUseException { if (tag == null) { @@ -575,7 +578,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if (_tag2SubscriptionTargetMap.containsKey(tag)) { - throw new AMQException("Consumer already exists with same tag: " + tag); + throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag); } ConsumerTarget_0_8 target; @@ -647,27 +650,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } } - catch (AccessControlException e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ExistingExclusiveConsumer e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ExistingConsumerPreventsExclusive e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (AMQInvalidArgumentException e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ConsumerAccessRefused e) + catch (AccessControlException + | MessageSource.ExistingExclusiveConsumer + | MessageSource.ExistingConsumerPreventsExclusive + | AMQInvalidArgumentException + | MessageSource.ConsumerAccessRefused e) { _tag2SubscriptionTargetMap.remove(tag); throw e; @@ -728,7 +715,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> unsubscribeAllConsumers(); - for (Action<? super AMQChannel<T>> task : _taskList) + for (Action<? super AMQChannel> task : _taskList) { task.performAction(this); } @@ -895,9 +882,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> /** * Called to resend all outstanding unacknowledged messages to this same channel. * - * @throws AMQException When something goes wrong. */ - public void resend() throws AMQException + public void resend() { @@ -983,9 +969,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only * acknowledges the single message specified by the delivery tag * - * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException + public void acknowledgeMessage(long deliveryTag, boolean multiple) { Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); @@ -1082,22 +1067,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public boolean isSuspended() { - return _suspended.get() || _closing.get() || _session.isClosing(); - } - - public void commit() throws AMQException - { - commit(null, false); + return _suspended.get() || _closing.get() || _connection.isClosing(); } - public void commit(final Runnable immediateAction, boolean async) throws AMQException + public void commit(final Runnable immediateAction, boolean async) { - if (!isTransactional()) - { - throw new AMQException("Fatal error: commit called on non-transactional channel"); - } if(async && _transaction instanceof LocalTransaction) { @@ -1130,17 +1106,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } - public void rollback() throws AMQException - { - rollback(NULL_TASK); - } - - public void rollback(Runnable postRollbackTask) throws AMQException + public void rollback(Runnable postRollbackTask) { - if (!isTransactional()) - { - throw new AMQException("Fatal error: commit called on non-transactional channel"); - } // stop all subscriptions _rollingBack = true; @@ -1198,7 +1165,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public String toString() { - return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]"; + return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]"; } public void setDefaultQueue(AMQQueue queue) @@ -1217,9 +1184,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>> return _closing.get(); } - public AMQProtocolSession getProtocolSession() + public AMQProtocolEngine getConnection() { - return _session; + return _connection; } public FlowCreditManager getCreditManager() @@ -1262,7 +1229,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> throws AMQException { - AMQMessage message = new AMQMessage(handle, _session.getReference()); + AMQMessage message = new AMQMessage(handle, _connection.getReference()); final BasicContentHeaderProperties properties = incomingMessage.getContentHeader().getProperties(); @@ -1273,7 +1240,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private boolean checkMessageUserId(ContentHeaderBody header) { AMQShortString userID = header.getProperties().getUserId(); - return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); + return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); } @@ -1284,14 +1251,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } @Override - public T getConnectionModel() + public AMQProtocolEngine getConnectionModel() { - return _session; + return _connection; } public String getClientID() { - return String.valueOf(_session.getContextKey()); + return String.valueOf(_connection.getContextKey()); } public LogSubject getLogSubject() @@ -1306,13 +1273,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } @Override - public void addDeleteTask(final Action<? super AMQChannel<T>> task) + public void addDeleteTask(final Action<? super AMQChannel> task) { _taskList.add(task); } @Override - public void removeDeleteTask(final Action<? super AMQChannel<T>> task) + public void removeDeleteTask(final Action<? super AMQChannel> task) { _taskList.remove(task); } @@ -1324,8 +1291,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public ChannelMethodProcessor getMethodProcessor() { - // TODO - return null; + return _channelMethodProcessor; } @@ -1356,7 +1322,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public void postCommit() { final ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); + _connection.getProtocolOutputConverter(); outputConverter.writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), @@ -1479,7 +1445,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public void postCommit() { AMQMessage message = _reference.getMessage(); - _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), + _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), message, _channelId, @@ -1548,7 +1514,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> @Override public Object getConnectionReference() { - return getProtocolSession().getReference(); + return getConnection().getReference(); } public int getUnacknowledgedMessageCount() @@ -1558,9 +1524,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private void flow(boolean flow) { - MethodRegistry methodRegistry = _session.getMethodRegistry(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); - _session.writeFrame(responseBody.generateFrame(_channelId)); + _connection.writeFrame(responseBody.generateFrame(_channelId)); } @Override @@ -1571,7 +1537,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public VirtualHostImpl getVirtualHost() { - return getProtocolSession().getVirtualHost(); + return getConnection().getVirtualHost(); } public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) @@ -1585,11 +1551,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> */ private void closeConnection(String reason) throws AMQException { - Lock receivedLock = _session.getReceivedLock(); + Lock receivedLock = _connection.getReceivedLock(); receivedLock.lock(); try { - _session.close(AMQConstant.RESOURCE_ERROR, reason); + _connection.close(AMQConstant.RESOURCE_ERROR, reason); } finally { @@ -1597,7 +1563,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } - public void deadLetter(long deliveryTag) throws AMQException + public void deadLetter(long deliveryTag) { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 1d0c0a9b25..4f560b1e74 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -57,6 +57,7 @@ import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; @@ -70,6 +71,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; @@ -84,7 +86,9 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine> +public class AMQProtocolEngine implements ServerProtocolEngine, + AMQConnectionModel<AMQProtocolEngine, AMQChannel>, + AMQVersionAwareProtocolSession { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -103,13 +107,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private VirtualHostImpl<?,?,?> _virtualHost; - private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = - new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); + private final Map<Integer, AMQChannel> _channelMap = + new HashMap<Integer, AMQChannel>(); private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = new CopyOnWriteArrayList<SessionModelListener>(); - @SuppressWarnings("unchecked") - private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; /** * The channels that the latest call to {@link #received(ByteBuffer)} applied to. @@ -118,8 +121,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage = - new HashSet<AMQChannel<AMQProtocolEngine>>(); + private final Set<AMQChannel> _channelsForCurrentMessage = + new HashSet<AMQChannel>(); private AMQDecoder _decoder; @@ -365,7 +368,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { RuntimeException exception = null; - for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage) + for (AMQChannel channel : _channelsForCurrentMessage) { try { @@ -428,7 +431,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); - AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId); + AMQChannel amqChannel = _channelMap.get(channelId); if(amqChannel != null) { // The _receivedLock is already acquired in the caller @@ -638,8 +641,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.info("Closing channel due to: " + e.getMessage()); } - writeFrame(e.getCloseFrame(channelId)); - closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); + AMQConstant errorType = e.getErrorCode(); + if(errorType == null) + { + errorType = AMQConstant.INTERNAL_ERROR; + } + writeFrame(new AMQFrame(channelId, + getMethodRegistry().createChannelCloseBody(errorType.getCode(), + AMQShortString.validValueOf(e.getMessage()), + e.getClassId(), + e.getMethodId()))); + closeChannel(channelId, errorType, e.getMessage()); } else { @@ -730,7 +742,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { - AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); + AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentHeader(body); @@ -738,7 +750,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { - AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); + AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentBody(body); } @@ -786,17 +798,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _contextKey = contextKey; } - public List<AMQChannel<AMQProtocolEngine>> getChannels() + public List<AMQChannel> getChannels() { synchronized (_channelMap) { - return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values()); + return new ArrayList<AMQChannel>(_channelMap.values()); } } - public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException + public AMQChannel getAndAssertChannel(int channelId) throws AMQException { - AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); + AMQChannel channel = getChannel(channelId); if (channel == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); @@ -805,9 +817,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return channel; } - public AMQChannel<AMQProtocolEngine> getChannel(int channelId) + public AMQChannel getChannel(int channelId) { - final AMQChannel<AMQProtocolEngine> channel = + final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { @@ -824,7 +836,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } - public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException + public void addChannel(AMQChannel channel) throws AMQException { if (_closed) { @@ -891,52 +903,52 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = value; } - /** - * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue - * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> - * - * @param channelId id of the channel to close - * - * @throws IllegalArgumentException if the channel id is not valid - */ - @Override - public void closeChannel(int channelId) + + void closeChannel(AMQChannel channel) { - closeChannel(channelId, null, null); + closeChannel(channel, null, null, false); + } + + public void closeChannelAndWriteFrame(AMQChannel channel, AMQConstant cause, String message) + { + writeFrame(new AMQFrame(channel.getChannelId(), + getMethodRegistry().createChannelCloseBody(cause.getCode(), + AMQShortString.validValueOf(message), + _methodProcessor.getClassId(), + _methodProcessor.getMethodId()))); + closeChannel(channel, cause, message, true); } public void closeChannel(int channelId, AMQConstant cause, String message) { - final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); + final AMQChannel channel = getChannel(channelId); if (channel == null) { throw new IllegalArgumentException("Unknown channel id"); } - else + closeChannel(channel, cause, message, true); + } + + void closeChannel(AMQChannel channel, AMQConstant cause, String message, boolean mark) + { + int channelId = channel.getChannelId(); + try { - try + channel.close(cause, message); + if(mark) { - channel.close(cause, message); markChannelAwaitingCloseOk(channelId); } - finally - { - removeChannel(channelId); - } + } + finally + { + removeChannel(channelId); } } + public void closeChannelOk(int channelId) { - // todo QPID-847 - This is called from two locations ChannelCloseHandler and ChannelCloseOkHandler. - // When it is the CC_OK_Handler then it makes sense to remove the channel else we will leak memory. - // We do it from the Close Handler as we are sending the OK back to the client. - // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException - // will send a close-ok.. Where we should call removeChannel. - // However, due to the poor exception handling on the client. The client-user will be notified of the - // InvalidArgument and if they then decide to close the session/connection then the there will be time - // for that to occur i.e. a new close method be sent before the exception handling can mark the session closed. - _closingChannelsList.remove(channelId); } @@ -952,7 +964,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ public void removeChannel(int channelId) { - AMQChannel<AMQProtocolEngine> session; + AMQChannel session; synchronized (_channelMap) { session = _channelMap.remove(channelId); @@ -988,7 +1000,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ private void closeAllChannels() { - for (AMQChannel<AMQProtocolEngine> channel : getChannels()) + for (AMQChannel channel : getChannels()) { channel.close(); } @@ -1003,7 +1015,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } /** This must be called when the session is _closed in order to free up any resources managed by the session. */ - @Override public void closeSession() { @@ -1103,16 +1114,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi void closeConnection(AMQConstant errorCode, - String message, int channelId, - int classId, - int methodId) + String message, int channelId) { if (_logger.isInfoEnabled()) { _logger.info("Closing connection due to: " + message); } - closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), classId, methodId))); + closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _methodProcessor.getClassId(), _methodProcessor.getMethodId()))); } private void closeConnection(int channelId, AMQFrame frame) @@ -1137,7 +1146,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } - @Override public void closeProtocolSession() { _network.close(); @@ -1520,7 +1528,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getRemoteAddress()); } - public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message) + public void closeSession(AMQChannel session, AMQConstant cause, String message) { int channelId = session.getChannelId(); closeChannel(channelId, cause, message); @@ -1549,7 +1557,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(!_blocking) { _blocking = true; - for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) + for(AMQChannel channel : _channelMap.values()) { channel.block(); } @@ -1564,7 +1572,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(_blocking) { _blocking = false; - for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) + for(AMQChannel channel : _channelMap.values()) { channel.unblock(); } @@ -1577,9 +1585,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _closed; } - public List<AMQChannel<AMQProtocolEngine>> getSessionModels() + public List<AMQChannel> getSessionModels() { - return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels()); + return new ArrayList<AMQChannel>(getChannels()); } public LogSubject getLogSubject() @@ -1715,31 +1723,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _lastWriteTime.get(); } - @Override public boolean isCloseWhenNoRoute() { return _closeWhenNoRoute; } - @Override public boolean isCompressionSupported() { return _compressionSupported && _broker.isMessageCompressionEnabled(); } - @Override public int getMessageCompressionThreshold() { return _messageCompressionThreshold; } - @Override public Broker<?> getBroker() { return _broker; } - @Override public SubjectCreator getSubjectCreator() { return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure()); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java deleted file mode 100644 index 15ed24b78f..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import java.net.SocketAddress; -import java.security.Principal; -import java.util.List; -import java.util.concurrent.locks.Lock; - -import javax.security.auth.Subject; -import javax.security.sasl.SaslServer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - - -public interface AMQProtocolSession<T extends AMQProtocolSession<T>> - extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel<T,AMQChannel<T>> -{ - long getSessionID(); - - void setMaxFrameSize(int frameMax); - - long getMaxFrameSize(); - - boolean isClosing(); - - void flushBatched(); - - void setDeferFlush(boolean defer); - - ClientDeliveryMethod createDeliveryMethod(int channelId); - - long getLastReceivedTime(); - - /** - * Return the local socket address for the connection - * - * @return the socket address - */ - SocketAddress getLocalAddress(); - - /** - * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC - * 6). - * - * @return the context key - */ - AMQShortString getContextKey(); - - /** - * Set the context key associated with this session. Context key is described in the AMQ protocol specification (RFC - * 6). - * - * @param contextKey the context key - */ - void setContextKey(AMQShortString contextKey); - - /** - * Get the channel for this session associated with the specified id. A channel id is unique per connection (i.e. - * per session). - * - * @param channelId the channel id which must be valid - * - * @return null if no channel exists, the channel otherwise - */ - AMQChannel<T> getChannel(int channelId); - - /** - * Associate a channel with this session. - * - * @param channel the channel to associate with this session. It is an error to associate the same channel with more - * than one session but this is not validated. - */ - void addChannel(AMQChannel<T> channel) throws AMQException; - - /** - * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue - * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> - * - * @param channelId id of the channel to close - * - * @throws org.apache.qpid.AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid - */ - void closeChannel(int channelId) throws AMQException; - - void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException; - - /** - * Marks the specific channel as closed. This will release the lock for that channel id so a new channel can be - * created on that id. - * - * @param channelId id of the channel to close - */ - void closeChannelOk(int channelId); - - /** - * Check to see if this chanel is closing - * - * @param channelId id to check - * @return boolean with state of channel awaiting closure - */ - boolean channelAwaitingClosure(int channelId); - - /** - * Remove a channel from the session but do not close it. - * - * @param channelId - */ - void removeChannel(int channelId); - - /** - * Initialise heartbeats on the session. - * - * @param delay delay in seconds (not ms) - */ - void initHeartbeats(int delay); - - /** This must be called when the session is _closed in order to free up any resources managed by the session. */ - void closeSession(); - - void closeProtocolSession(); - - /** @return a key that uniquely identifies this session */ - Object getKey(); - - /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may - * be bound to multiple addresses this could vary depending on the acceptor this session was created from. - * - * @return a String FQDN - */ - String getLocalFQDN(); - - /** @return the sasl server that can perform authentication for this session. */ - SaslServer getSaslServer(); - - /** - * Set the sasl server that is to perform authentication for this session. - * - * @param saslServer - */ - void setSaslServer(SaslServer saslServer); - - void setClientProperties(FieldTable clientProperties); - - Object getReference(); - - VirtualHostImpl<?,?,?> getVirtualHost(); - - void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException; - - public ProtocolOutputConverter getProtocolOutputConverter(); - - void setAuthorizedSubject(Subject authorizedSubject); - - public java.net.SocketAddress getRemoteAddress(); - - public MethodRegistry getMethodRegistry(); - - public MethodDispatcher getMethodDispatcher(); - - String getClientVersion(); - - long getLastIoTime(); - - long getWrittenBytes(); - - Long getMaximumNumberOfChannels(); - - void setMaximumNumberOfChannels(Long value); - - List<AMQChannel<T>> getChannels(); - - public Principal getPeerPrincipal(); - - Lock getReceivedLock(); - - /** - * Used for 0-8/0-9/0-9-1 connections to choose to close - * the connection when a transactional session receives a 'mandatory' message which - * can't be routed rather than returning the message. - */ - boolean isCloseWhenNoRoute(); - - boolean isCompressionSupported(); - - int getMessageCompressionThreshold(); - - Broker<?> getBroker(); - - SubjectCreator getSubjectCreator(); -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java index d8c7115316..d4c7f151e7 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java @@ -20,6 +20,90 @@ */ package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + public interface ChannelMethodProcessor { + void receiveAccessRequest(AMQShortString realm, + boolean exclusive, + boolean passive, + boolean active, + boolean write, + boolean read); + + void receiveBasicAck(long deliveryTag, boolean multiple); + + void receiveBasicCancel(AMQShortString consumerTag, boolean nowait); + + void receiveBasicConsume(AMQShortString queue, + AMQShortString consumerTag, + boolean noLocal, + boolean noAck, + boolean exclusive, + boolean nowait, + FieldTable arguments); + + void receiveBasicGet(AMQShortString queue, boolean noAck); + + void receiveBasicPublish(AMQShortString exchange, + AMQShortString routingKey, + boolean mandatory, + boolean immediate); + + void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global); + + void receiveBasicRecover(boolean requeue, boolean sync); + + void receiveBasicReject(long deliveryTag, boolean requeue); + + void receiveChannelClose(); + + void receiveChannelCloseOk(); + + void receiveChannelFlow(boolean active); + + void receiveExchangeBound(AMQShortString exchange, AMQShortString queue, AMQShortString routingKey); + + void receiveExchangeDeclare(AMQShortString exchange, + AMQShortString type, + boolean passive, + boolean durable, + boolean autoDelete, + boolean internal, + boolean nowait, + FieldTable arguments); + + void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait); + + void receiveQueueBind(AMQShortString queue, + AMQShortString exchange, + AMQShortString routingKey, + boolean nowait, + FieldTable arguments); + + void receiveQueueDeclare(AMQShortString queueStr, + boolean passive, + boolean durable, + boolean exclusive, + boolean autoDelete, + boolean nowait, + FieldTable arguments); + + void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); + + void receiveQueuePurge(AMQShortString queue, boolean nowait); + + void receiveQueueUnbind(AMQShortString queue, + AMQShortString exchange, + AMQShortString routingKey, + FieldTable arguments); + + void receiveTxSelect(); + + void receiveTxCommit(); + + void receiveTxRollback(); + + } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java new file mode 100644 index 0000000000..5e55d24b92 --- /dev/null +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java @@ -0,0 +1,1484 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import java.security.AccessControlException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.UUID; + +import org.apache.log4j.Logger; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.AccessRequestOkBody; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.BasicGetEmptyBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.MessagePublishInfo; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.MessageOnlyCreditManager; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.NoFactoryForTypeException; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UnknownConfiguredObjectException; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.QueueExistsException; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; +import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class ChannelMethodProcessorImpl implements ChannelMethodProcessor +{ + private static final Logger _logger = Logger.getLogger(ChannelMethodProcessorImpl.class); + + private final AMQChannel _channel; + private final AMQProtocolEngine _connection; + + public ChannelMethodProcessorImpl(final AMQChannel channel) + { + _channel = channel; + _connection = _channel.getConnection(); + } + + @Override + public void receiveAccessRequest(final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, + final boolean write, + final boolean read) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion())) + { + closeConnection(AMQConstant.COMMAND_INVALID, + "AccessRequest not present in AMQP versions other than 0-8, 0-9"); + } + else + { + // We don't implement access control class, but to keep clients happy that expect it + // always use the "0" ticket. + AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); + _channel.sync(); + _connection.writeFrame(response.generateFrame(getChannelId())); + } + } + + @Override + public void receiveBasicAck(final long deliveryTag, final boolean multiple) + { + _channel.acknowledgeMessage(deliveryTag, multiple); + } + + @Override + public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait) + { + _channel.unsubscribeConsumer(consumerTag); + if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag); + _channel.sync(); + _connection.writeFrame(cancelOkBody.generateFrame(getChannelId())); + } + } + + @Override + public void receiveBasicConsume(final AMQShortString queueNameStr, + AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, + final boolean nowait, + final FieldTable arguments) + { + VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost(); + _channel.sync(); + String queueName = queueNameStr == null ? null : queueNameStr.asString(); + + MessageSource queue = queueName == null ? _channel.getDefaultQueue() : vHost.getQueue(queueName); + final Collection<MessageSource> sources = new HashSet<>(); + if (queue != null) + { + sources.add(queue); + } + else if (vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + && arguments != null + && arguments.get("x-multiqueue") instanceof Collection) + { + for (Object object : (Collection<Object>) arguments.get("x-multiqueue")) + { + String sourceName = String.valueOf(object); + sourceName = sourceName.trim(); + if (sourceName.length() != 0) + { + MessageSource source = vHost.getMessageSource(sourceName); + if (source == null) + { + sources.clear(); + break; + } + else + { + sources.add(source); + } + } + } + queueName = arguments.get("x-multiqueue").toString(); + } + + if (sources.isEmpty()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("No queue for '" + queueName + "'"); + } + if (queueName != null) + { + closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'"); + } + else + { + closeConnection(AMQConstant.NOT_ALLOWED, "No queue name provided, no default queue defined."); + } + } + else + { + try + { + consumerTag = _channel.consumeFromSource(consumerTag, + sources, + !noAck, + arguments, + exclusive, + noLocal); + if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + } + catch (ConsumerTagInUseException cte) + { + + closeConnection(AMQConstant.NOT_ALLOWED, "Non-unique consumer tag, '" + consumerTag + "'"); + } + catch (AMQInvalidArgumentException ise) + { + closeConnection(AMQConstant.ARGUMENT_INVALID, ise.getMessage()); + + + } + catch (AMQQueue.ExistingExclusiveConsumer e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + + queue.getName() + + " as it already has an existing exclusive consumer"); + + } + catch (AMQQueue.ExistingConsumerPreventsExclusive e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + + queue.getName() + + " exclusively as it already has a consumer"); + + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + + queue.getName() + + " permission denied"); + + } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + + queue.getName() + + " as it already has an incompatible exclusivity policy"); + + } + + } + } + + @Override + public void receiveBasicGet(final AMQShortString queueName, final boolean noAck) + { + VirtualHostImpl vHost = _connection.getVirtualHost(); + _channel.sync(); + AMQQueue queue = + queueName == null ? _channel.getDefaultQueue() : vHost.getQueue(queueName.toString()); + if (queue == null) + { + _logger.info("No queue for '" + queueName + "'"); + if (queueName != null) + { + closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'"); + + } + else + { + closeConnection(AMQConstant.NOT_ALLOWED, "No queue name provided, no default queue defined."); + + } + } + else + { + + try + { + if (!performGet(queue, _connection, _channel, !noAck)) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); + + + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + } + catch (MessageSource.ExistingExclusiveConsumer e) + { + closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer"); + } + catch (MessageSource.ExistingConsumerPreventsExclusive e) + { + closeConnection(AMQConstant.INTERNAL_ERROR, + "The GET request has been evaluated as an exclusive consumer, " + + "this is likely due to a programming error in the Qpid broker"); + } + catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) + { + closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an incompatible exclusivity policy"); + } + } + } + + @Override + public void receiveBasicPublish(final AMQShortString exchangeName, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) + { + VirtualHostImpl vHost = _connection.getVirtualHost(); + + MessageDestination destination; + + if (isDefaultExchange(exchangeName)) + { + destination = vHost.getDefaultDestination(); + } + else + { + destination = vHost.getMessageDestination(exchangeName.toString()); + } + + // if the exchange does not exist we raise a channel exception + if (destination == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName); + } + else + { + + MessagePublishInfo info = new MessagePublishInfo(exchangeName, + immediate, + mandatory, + routingKey); + + try + { + _channel.setPublishFrame(info, destination); + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + + } + } + } + + @Override + public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) + { + _channel.sync(); + _channel.setCredit(prefetchSize, prefetchCount); + + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + + @Override + public void receiveBasicRecover(final boolean requeue, final boolean sync) + { + _channel.resend(); + + if (sync) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); + _channel.sync(); + _connection.writeFrame(recoverOk.generateFrame(getChannelId())); + + } + + } + + @Override + public void receiveBasicReject(final long deliveryTag, final boolean requeue) + { + MessageInstance message = _channel.getUnacknowledgedMessageMap().get(deliveryTag); + + if (message == null) + { + _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); + } + else + { + + if (message.getMessage() == null) + { + _logger.warn("Message has already been purged, unable to Reject."); + } + else + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + + ": Requeue:" + requeue + + " on channel:" + _channel.debugIdentity()); + } + + if (requeue) + { + //this requeue represents a message rejected from the pre-dispatch queue + //therefore we need to amend the delivery counter. + message.decrementDeliveryCount(); + + _channel.requeue(deliveryTag); + } + else + { + // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here + // as it would prevent redelivery + // message.reject(); + + final boolean maxDeliveryCountEnabled = _channel.isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + + maxDeliveryCountEnabled + + " deliveryTag " + + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = _channel.isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + + deliveredTooManyTimes + + " deliveryTag " + + deliveryTag); + if (deliveredTooManyTimes) + { + _channel.deadLetter(deliveryTag); + } + else + { + //this requeue represents a message rejected because of a recover/rollback that we + //are not ready to DLQ. We rely on the reject command to resend from the unacked map + //and therefore need to increment the delivery counter so we cancel out the effect + //of the AMQChannel#resend() decrement. + message.incrementDeliveryCount(); + } + } + else + { + _channel.requeue(deliveryTag); + } + } + } + } + } + + @Override + public void receiveChannelClose() + { + _channel.sync(); + _connection.closeChannel(_channel); + + _connection.writeFrame(new AMQFrame(_channel.getChannelId(), + _connection.getMethodRegistry().createChannelCloseOkBody())); + } + + @Override + public void receiveChannelCloseOk() + { + _connection.closeChannelOk(getChannelId()); + } + + @Override + public void receiveChannelFlow(final boolean active) + { + _channel.sync(); + _channel.setSuspended(!active); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(active); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + + @Override + public void receiveExchangeBound(final AMQShortString exchangeName, + final AMQShortString queueName, + final AMQShortString routingKey) + { + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + + _channel.sync(); + + int replyCode; + String replyText; + + if (isDefaultExchange(exchangeName)) + { + if (routingKey == null) + { + if (queueName == null) + { + replyCode = virtualHost.getQueues().isEmpty() + ? ExchangeBoundOkBody.NO_BINDINGS + : ExchangeBoundOkBody.OK; + replyText = null; + + } + else + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; + replyText = "Queue '" + queueName + "' not found"; + } + else + { + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + } + } + else + { + if (queueName == null) + { + replyCode = virtualHost.getQueue(routingKey.toString()) == null + ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK + : ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + + replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; + replyText = "Queue '" + queueName + "' not found"; + } + else + { + replyCode = queueName.equals(routingKey) + ? ExchangeBoundOkBody.OK + : ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK; + replyText = null; + } + } + } + } + else + { + ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); + if (exchange == null) + { + + replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND; + replyText = "Exchange '" + exchangeName + "' not found"; + } + else if (routingKey == null) + { + if (queueName == null) + { + if (exchange.hasBindings()) + { + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + replyCode = ExchangeBoundOkBody.NO_BINDINGS; + replyText = null; + } + } + else + { + + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; + replyText = "Queue '" + queueName + "' not found"; + } + else + { + if (exchange.isBound(queue)) + { + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND; + replyText = "Queue '" + + queueName + + "' not bound to exchange '" + + exchangeName + + "'"; + } + } + } + } + else if (queueName != null) + { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; + replyText = "Queue '" + queueName + "' not found"; + } + else + { + String bindingKey = routingKey == null ? null : routingKey.asString(); + if (exchange.isBound(bindingKey, queue)) + { + + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + replyCode = ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK; + replyText = "Queue '" + queueName + "' not bound with routing key '" + + routingKey + "' to exchange '" + exchangeName + "'"; + + } + } + } + else + { + if (exchange.isBound(routingKey == null ? "" : routingKey.asString())) + { + + replyCode = ExchangeBoundOkBody.OK; + replyText = null; + } + else + { + replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK; + replyText = + "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'"; + } + } + } + + ExchangeBoundOkBody exchangeBoundOkBody = + methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText)); + + _connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId())); + + } + + @Override + public void receiveExchangeDeclare(final AMQShortString exchangeName, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, + final FieldTable arguments) + { + ExchangeImpl exchange; + VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost(); + if (isDefaultExchange(exchangeName)) + { + if (!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type)) + { + closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " + + " of type " + + ExchangeDefaults.DIRECT_EXCHANGE_CLASS + + " to " + type + "."); + } + else if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + _channel.sync(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + + } + else + { + if (passive) + { + exchange = virtualHost.getExchange(exchangeName.toString()); + if (exchange == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + } + else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.asString())) + { + + closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + + exchangeName + + " of type " + + exchange.getType() + + " to " + + type + + "."); + } + else if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + _channel.sync(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + + } + else + { + try + { + String name = exchangeName == null ? null : exchangeName.intern().toString(); + String typeString = type == null ? null : type.intern().toString(); + + Map<String, Object> attributes = new HashMap<String, Object>(); + if (arguments != null) + { + attributes.putAll(FieldTable.convertToMap(arguments)); + } + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, typeString); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + if (!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) + { + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + } + exchange = virtualHost.createExchange(attributes); + + if (!nowait) + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); + _channel.sync(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + + } + catch (ReservedExchangeNameException e) + { + closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to declare exchange: " + exchangeName + + " which begins with reserved prefix."); + + + } + catch (ExchangeExistsException e) + { + exchange = e.getExistingExchange(); + if (!new AMQShortString(exchange.getType()).equals(type)) + { + closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + + exchange.getType() + + " to " + type + "."); + + } + } + catch (NoFactoryForTypeException e) + { + closeConnection(AMQConstant.COMMAND_INVALID, "Unknown exchange type '" + + e.getType() + + "' for exchange '" + + exchangeName + + "'"); + + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + + } + catch (UnknownConfiguredObjectException e) + { + // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur + final String message = "Unknown alternate exchange " + + (e.getName() != null + ? "name: \"" + e.getName() + "\"" + : "id: " + e.getId()); + closeConnection(AMQConstant.NOT_FOUND, message); + + } + catch (IllegalArgumentException e) + { + closeConnection(AMQConstant.COMMAND_INVALID, "Error creating exchange '" + + exchangeName + + "': " + + e.getMessage()); + + } + } + } + + } + + @Override + public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait) + { + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + _channel.sync(); + try + { + + if (isDefaultExchange(exchangeStr)) + { + closeConnection(AMQConstant.NOT_ALLOWED, + "Default Exchange cannot be deleted"); + + } + + else + { + final String exchangeName = exchangeStr.toString(); + + final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); + if (exchange == null) + { + closeChannel(AMQConstant.NOT_FOUND, "No such exchange: " + exchangeStr); + } + else + { + virtualHost.removeExchange(exchange, !ifUnused); + + ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody(); + + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + } + } + catch (ExchangeIsAlternateException e) + { + closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + } + catch (RequiredExchangeException e) + { + closeChannel(AMQConstant.NOT_ALLOWED, "Exchange '" + exchangeStr + "' cannot be deleted"); + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + } + } + + @Override + public void receiveQueueBind(final AMQShortString queueName, + final AMQShortString exchange, + AMQShortString routingKey, + final boolean nowait, + final FieldTable argumentsTable) + { + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + AMQQueue<?> queue; + if (queueName == null) + { + + queue = _channel.getDefaultQueue(); + + if (queue != null) + { + if (routingKey == null) + { + routingKey = AMQShortString.valueOf(queue.getName()); + } + else + { + routingKey = routingKey.intern(); + } + } + } + else + { + queue = virtualHost.getQueue(queueName.toString()); + routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey.intern(); + } + + if (queue == null) + { + String message = queueName == null + ? "No default queue defined on channel and queue was null" + : "Queue " + queueName + " does not exist."; + closeChannel(AMQConstant.NOT_FOUND, message); + } + else if (isDefaultExchange(exchange)) + { + closeConnection(AMQConstant.NOT_ALLOWED, + "Cannot bind the queue " + queueName + " to the default exchange" + ); + + } + else + { + + final String exchangeName = exchange.toString(); + + final ExchangeImpl exch = virtualHost.getExchange(exchangeName); + if (exch == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist."); + } + else + { + + try + { + + Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable); + String bindingKey = String.valueOf(routingKey); + + if (!exch.isBound(bindingKey, arguments, queue)) + { + + if (!exch.addBinding(bindingKey, queue, arguments) + && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals( + exch.getType())) + { + exch.replaceBinding(bindingKey, queue, arguments); + } + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Binding queue " + + queue + + " to exchange " + + exch + + " with routing key " + + routingKey); + } + if (!nowait) + { + _channel.sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + } + } + } + } + + @Override + public void receiveQueueDeclare(final AMQShortString queueStr, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) + { + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + + final AMQShortString queueName; + + // if we aren't given a queue name, we create one which we return to the client + if ((queueStr == null) || (queueStr.length() == 0)) + { + queueName = new AMQShortString("tmp_" + UUID.randomUUID()); + } + else + { + queueName = queueStr.intern(); + } + + AMQQueue queue; + + //TODO: do we need to check that the queue already exists with exactly the same "configuration"? + + + if (passive) + { + queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + closeChannel(AMQConstant.NOT_FOUND, + "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."); + } + else + { + if (!queue.verifySessionAccess(_channel)) + { + closeConnection(AMQConstant.NOT_ALLOWED, + "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection."); + } + else + { + //set this as the default queue on the channel: + _channel.setDefaultQueue(queue); + if (!nowait) + { + _channel.sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody(queueName, + queue.getQueueDepthMessages(), + queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + _logger.info("Queue " + queueName + " declared successfully"); + } + } + } + } + else + { + + try + { + Map<String, Object> attributes = + QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments)); + final String queueNameString = AMQShortString.toString(queueName); + attributes.put(Queue.NAME, queueNameString); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.DURABLE, durable); + + LifetimePolicy lifetimePolicy; + ExclusivityPolicy exclusivityPolicy; + + if (exclusive) + { + lifetimePolicy = autoDelete + ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; + exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; + } + else + { + lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; + exclusivityPolicy = ExclusivityPolicy.NONE; + } + + attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); + attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); + + + queue = virtualHost.createQueue(attributes); + + _channel.setDefaultQueue(queue); + + if (!nowait) + { + _channel.sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody(queueName, + queue.getQueueDepthMessages(), + queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + _logger.info("Queue " + queueName + " declared successfully"); + } + } + catch (QueueExistsException qe) + { + + queue = qe.getExistingQueue(); + + if (!queue.verifySessionAccess(_channel)) + { + closeConnection(AMQConstant.NOT_ALLOWED, + "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection."); + + } + else if (queue.isExclusive() != exclusive) + { + + closeChannel(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different exclusivity (was: " + + queue.isExclusive() + + " requested " + + exclusive + + ")"); + } + else if ((autoDelete + && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) + || (!autoDelete && queue.getLifetimePolicy() != ((exclusive + && !durable) + ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + : LifetimePolicy.PERMANENT))) + { + closeChannel(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different lifetime policy (was: " + + queue.getLifetimePolicy() + + " requested autodelete: " + + autoDelete + + ")"); + } + else if (queue.isDurable() != durable) + { + closeChannel(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different durability (was: " + + queue.isDurable() + + " requested " + + durable + + ")"); + } + else + { + _channel.setDefaultQueue(queue); + if (!nowait) + { + _channel.sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody(queueName, + queue.getQueueDepthMessages(), + queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + _logger.info("Queue " + queueName + " declared successfully"); + } + } + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + } + + } + } + + @Override + public void receiveQueueDelete(final AMQShortString queueName, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) + { + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + _channel.sync(); + AMQQueue queue; + if (queueName == null) + { + + //get the default queue on the channel: + queue = _channel.getDefaultQueue(); + } + else + { + queue = virtualHost.getQueue(queueName.toString()); + } + + if (queue == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); + + } + else + { + if (ifEmpty && !queue.isEmpty()) + { + closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is not empty."); + } + else if (ifUnused && !queue.isUnused()) + { + // TODO - Error code + closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is still used."); + } + else + { + if (!queue.verifySessionAccess(_channel)) + { + closeConnection(AMQConstant.NOT_ALLOWED, + "Queue " + + queue.getName() + + " is exclusive, but not created on this Connection."); + + } + else + { + int purged = 0; + try + { + purged = virtualHost.removeQueue(queue); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + + } + } + } + } + } + + @Override + public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait) + { + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + AMQQueue queue = null; + if (queueName == null && (queue = _channel.getDefaultQueue()) == null) + { + + closeConnection(AMQConstant.NOT_ALLOWED, "No queue specified."); + } + else if ((queueName != null) && (queue = virtualHost.getQueue(queueName.toString())) == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); + } + else if (!queue.verifySessionAccess(_channel)) + { + closeConnection(AMQConstant.NOT_ALLOWED, + "Queue is exclusive, but not created on this Connection." + ); + } + else + { + try + { + long purged = queue.clearQueue(); + if (!nowait) + { + _channel.sync(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + + } + + } + } + + @Override + public void receiveQueueUnbind(final AMQShortString queueName, + final AMQShortString exchange, + AMQShortString routingKey, + final FieldTable arguments) + { + VirtualHostImpl virtualHost = _connection.getVirtualHost(); + + + + final boolean useDefaultQueue = queueName == null; + final AMQQueue queue = useDefaultQueue + ? _channel.getDefaultQueue() + : virtualHost.getQueue(queueName.toString()); + + + if (queue == null) + { + String message = useDefaultQueue + ? "No default queue defined on channel and queue was null" + : "Queue " + queueName + " does not exist."; + closeChannel(AMQConstant.NOT_FOUND, message); + } + else if (isDefaultExchange(exchange)) + { + closeConnection(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + + queue.getName() + + " from the default exchange"); + + } + else + { + + final ExchangeImpl exch = virtualHost.getExchange(exchange.toString()); + + if (exch == null) + { + closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchange + " does not exist."); + } + else if (!exch.hasBinding(String.valueOf(routingKey), queue)) + { + closeChannel(AMQConstant.NOT_FOUND, "No such binding"); + } + else + { + try + { + exch.deleteBinding(String.valueOf(routingKey), queue); + + final AMQMethodBody responseBody = _connection.getMethodRegistry().createQueueUnbindOkBody(); + _channel.sync(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + + } + } + + } + } + + @Override + public void receiveTxSelect() + { + _channel.setLocalTransactional(); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + } + + @Override + public void receiveTxCommit() + { + if (!_channel.isTransactional()) + { + closeChannel(AMQConstant.COMMAND_INVALID, "Fatal error: commit called on non-transactional channel"); + } + _channel.commit(new Runnable() + { + + @Override + public void run() + { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + }, true); + + } + + @Override + public void receiveTxRollback() + { + if (!_channel.isTransactional()) + { + closeChannel(AMQConstant.COMMAND_INVALID, "Fatal error: rollback called on non-transactional channel"); + } + + final MethodRegistry methodRegistry = _connection.getMethodRegistry(); + final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody(); + + Runnable task = new Runnable() + { + + public void run() + { + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } + }; + + _channel.rollback(task); + + //Now resend all the unacknowledged messages back to the original subscribers. + //(Must be done after the TxnRollback-ok response). + // Why, are we not allowed to send messages back to client before the ok method? + _channel.resend(); + } + + private void closeChannel(final AMQConstant cause, final String message) + { + _connection.closeChannelAndWriteFrame(_channel, cause, message); + } + + private void closeConnection(final AMQConstant cause, final String message) + { + _connection.closeConnection(cause, message, getChannelId()); + } + + private int getChannelId() + { + return _channel.getChannelId(); + } + + private boolean isDefaultExchange(final AMQShortString exchangeName) + { + return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName); + } + + public static boolean performGet(final AMQQueue queue, + final AMQProtocolEngine connection, + final AMQChannel channel, + final boolean acks) + throws MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused + { + + final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); + + final GetDeliveryMethod getDeliveryMethod = + new GetDeliveryMethod(singleMessageCredit, connection, channel, queue); + final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() + { + + public void recordMessageDelivery(final ConsumerImpl sub, + final MessageInstance entry, + final long deliveryTag) + { + channel.addUnacknowledgedMessage(entry, deliveryTag, null); + } + }; + + ConsumerTarget_0_8 target; + EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES); + if (acks) + { + + target = ConsumerTarget_0_8.createAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); + } + else + { + target = ConsumerTarget_0_8.createGetNoAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); + } + + ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); + sub.flush(); + sub.close(); + return getDeliveryMethod.hasDeliveredMessage(); + + + } + + + private static class GetDeliveryMethod implements ClientDeliveryMethod + { + + private final FlowCreditManager _singleMessageCredit; + private final AMQProtocolEngine _connection; + private final AMQChannel _channel; + private final AMQQueue _queue; + private boolean _deliveredMessage; + + public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, + final AMQProtocolEngine connection, + final AMQChannel channel, final AMQQueue queue) + { + _singleMessageCredit = singleMessageCredit; + _connection = connection; + _channel = channel; + _queue = queue; + } + + @Override + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, + final InstanceProperties props, final long deliveryTag) + { + _singleMessageCredit.useCreditForMessage(message.getSize()); + long size = _connection.getProtocolOutputConverter().writeGetOk(message, + props, + _channel.getChannelId(), + deliveryTag, + _queue.getQueueDepthMessages()); + + _deliveredMessage = true; + return size; + } + + public boolean hasDeliveredMessage() + { + return _deliveredMessage; + } + } + +} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java new file mode 100644 index 0000000000..25c1462060 --- /dev/null +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +public class ConsumerTagInUseException extends Exception +{ + public ConsumerTagInUseException(final String message) + { + super(message); + } +} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 446ba256ab..43982db2fd 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -79,7 +79,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager) throws AMQException + FlowCreditManager creditManager) { return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -89,7 +89,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen final FieldTable filters, final FlowCreditManager creditManager, final ClientDeliveryMethod deliveryMethod, - final RecordDeliveryMethod recordMethod) throws AMQException + final RecordDeliveryMethod recordMethod) { return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -106,7 +106,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); @@ -147,7 +146,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager) throws AMQException + FlowCreditManager creditManager) { return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -170,7 +169,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); @@ -206,7 +204,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen long size; synchronized (getChannel()) { - getChannel().getProtocolSession().setDeferFlush(batch); + getChannel().getConnection().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); size = sendToClient(consumer, message, props, deliveryTag); @@ -248,7 +246,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -264,7 +261,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager) - throws AMQException { return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } @@ -275,7 +271,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod); } @@ -287,7 +282,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -307,7 +301,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { - getChannel().getProtocolSession().setDeferFlush(batch); + getChannel().getConnection().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); addUnacknowledgedMessage(entry); @@ -345,7 +339,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) - throws AMQException { super(State.ACTIVE); @@ -473,9 +466,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return _consumerTag; } - public AMQProtocolSession getProtocolSession() + public AMQProtocolEngine getProtocolSession() { - return _channel.getProtocolSession(); + return _channel.getConnection(); } public void restoreCredit(final ServerMessage message) @@ -524,7 +517,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public void confirmAutoClose() { - ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); + ProtocolOutputConverter converter = getChannel().getConnection().getProtocolOutputConverter(); converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); } @@ -539,9 +532,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public void flushBatched() { - _channel.getProtocolSession().setDeferFlush(false); + _channel.getConnection().setDeferFlush(false); - _channel.getProtocolSession().flushBatched(); + _channel.getConnection().flushBatched(); } protected void addUnacknowledgedMessage(MessageInstance entry) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 7a2fdb05fc..d61eb1b223 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -20,14 +20,13 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.Map; + import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import java.util.Map; - public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class); @@ -45,7 +44,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor _msgToResend = msgToResend; } - public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException + public boolean callback(final long deliveryTag, MessageInstance message) { message.setRedelivered(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java index a926cd91cd..b616aab126 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java @@ -48,12 +48,12 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { private static final int BASIC_CLASS_ID = 60; - private final AMQProtocolSession _protocolSession; + private final AMQProtocolEngine _connection; private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING); - public ProtocolOutputConverterImpl(AMQProtocolSession session) + public ProtocolOutputConverterImpl(AMQProtocolEngine connection) { - _protocolSession = session; + _connection = connection; } @@ -76,7 +76,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } else { - return getMessageConverter(serverMessage).convert(serverMessage, _protocolSession.getVirtualHost()); + return getMessageConverter(serverMessage).convert(serverMessage, _connection.getVirtualHost()); } } @@ -99,7 +99,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter byte[] modifiedContent; // straight through case - boolean compressionSupported = _protocolSession.isCompressionSupported(); + boolean compressionSupported = _connection.isCompressionSupported(); if(msgCompressed && !compressionSupported && (modifiedContent = GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null) @@ -115,7 +115,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else if(!msgCompressed && compressionSupported && contentHeaderBody.getProperties().getEncoding()==null - && bodySize > _protocolSession.getMessageCompressionThreshold() + && bodySize > _connection.getMessageCompressionThreshold() && (modifiedContent = GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null) { BasicContentHeaderProperties modifiedProps = @@ -182,7 +182,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter } else { - int maxBodySize = (int) _protocolSession.getMaxFrameSize() - AMQFrame.getFrameOverhead(); + int maxBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead(); int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; @@ -316,7 +316,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public AMQBody createAMQBody() { - return _protocolSession.getMethodRegistry().createBasicDeliverBody(_consumerTag, + return _connection.getMethodRegistry().createBasicDeliverBody(_consumerTag, _deliveryTag, _isRedelivered, _exchangeName, @@ -372,7 +372,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); BasicGetOkBody getOkBody = - _protocolSession.getMethodRegistry().createBasicGetOkBody(deliveryTag, + _connection.getMethodRegistry().createBasicGetOkBody(deliveryTag, isRedelivered, exchangeName, routingKey, @@ -387,7 +387,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter { BasicReturnBody basicReturnBody = - _protocolSession.getMethodRegistry().createBasicReturnBody(replyCode, + _connection.getMethodRegistry().createBasicReturnBody(replyCode, replyText, messagePublishInfo.getExchange(), messagePublishInfo.getRoutingKey()); @@ -407,14 +407,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeFrame(AMQDataBlock block) { - _protocolSession.writeFrame(block); + _connection.writeFrame(block); } public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) { - BasicCancelOkBody basicCancelOkBody = _protocolSession.getMethodRegistry().createBasicCancelOkBody(consumerTag); + BasicCancelOkBody basicCancelOkBody = _connection.getMethodRegistry().createBasicCancelOkBody(consumerTag); writeFrame(basicCancelOkBody.generateFrame(channelId)); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java index 6f4c1d91d5..4df880d246 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java @@ -22,12 +22,6 @@ package org.apache.qpid.server.protocol.v0_8; import java.security.AccessControlException; import java.security.PrivilegedAction; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.UUID; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -37,44 +31,20 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.filter.AMQInvalidArgumentException; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageDestination; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.NoFactoryForTypeException; -import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UnknownConfiguredObjectException; import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; -import org.apache.qpid.server.virtualhost.ExchangeExistsException; -import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; -import org.apache.qpid.server.virtualhost.QueueExistsException; -import org.apache.qpid.server.virtualhost.RequiredExchangeException; -import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class ServerMethodDispatcherImpl implements MethodDispatcher { private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class); - private final AMQProtocolSession<?> _connection; + private final AMQProtocolEngine _connection; private static interface ChannelAction @@ -82,19 +52,19 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher void onChannel(ChannelMethodProcessor channel); } - public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) + public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection) { return new ServerMethodDispatcherImpl(connection); } - public ServerMethodDispatcherImpl(AMQProtocolSession<?> connection) + public ServerMethodDispatcherImpl(AMQProtocolEngine connection) { _connection = connection; } - protected final AMQProtocolSession<?> getConnection() + protected final AMQProtocolEngine getConnection() { return _connection; } @@ -104,7 +74,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher final AMQChannel channel = _connection.getChannel(channelId); if (channel == null) { - // TODO throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); + closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId); } else { @@ -121,595 +91,169 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException + public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId) { - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveAccessRequest(body.getRealm(), + body.getExclusive(), + body.getPassive(), + body.getActive(), + body.getWrite(), + body.getRead()); + } + } + ); - if(ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()) ) - { - throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9"); - } - - // We don't implement access control class, but to keep clients happy that expect it - // always use the "0" ticket. - AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); - channel.sync(); - _connection.writeFrame(response.generateFrame(channelId)); return true; } - public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException + public boolean dispatchBasicAck(final BasicAckBody body, int channelId) { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicAck(body.getDeliveryTag(), body.getMultiple()); + } + } + ); - if (_logger.isDebugEnabled()) - { - _logger.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId); - } - - final AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - // this method throws an AMQException if the delivery tag is not known - channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple()); return true; } - public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException - { - final AMQChannel channel = _connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("BasicCancel: for:" + body.getConsumerTag() + - " nowait:" + body.getNowait()); - } - - channel.unsubscribeConsumer(body.getConsumerTag()); - if (!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag()); - channel.sync(); - _connection.writeFrame(cancelOkBody.generateFrame(channelId)); - } + public boolean dispatchBasicCancel(final BasicCancelBody body, int channelId) + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicCancel(body.getConsumerTag(), + body.getNowait() + ); + } + } + ); return true; } - public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException + public boolean dispatchBasicConsume(final BasicConsumeBody body, int channelId) { - AMQChannel channel = _connection.getChannel(channelId); - VirtualHostImpl<?,?,?> vHost = _connection.getVirtualHost(); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - else - { - channel.sync(); - String queueName = body.getQueue() == null ? null : body.getQueue().asString(); - if (_logger.isDebugEnabled()) - { - _logger.debug("BasicConsume: from '" + queueName + - "' for:" + body.getConsumerTag() + - " nowait:" + body.getNowait() + - " args:" + body.getArguments()); - } - - MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName); - final Collection<MessageSource> sources = new HashSet<>(); - if(queue != null) - { - sources.add(queue); - } - else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") - && body.getArguments() != null - && body.getArguments().get("x-multiqueue") instanceof Collection) - { - for(Object object : (Collection<Object>) body.getArguments().get("x-multiqueue")) - { - String sourceName = String.valueOf(object); - sourceName = sourceName.trim(); - if(sourceName.length() != 0) - { - MessageSource source = vHost.getMessageSource(sourceName); - if(source == null) - { - sources.clear(); - break; - } - else - { - sources.add(source); - } - } - } - queueName = body.getArguments().get("x-multiqueue").toString(); - } - - if (sources.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("No queue for '" + queueName + "'"); - } - if (queueName != null) - { - String msg = "No such queue, '" + queueName + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry()); - } - else - { - String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, _connection.getMethodRegistry()); - } - } - else - { - final AMQShortString consumerTagName; - - if (body.getConsumerTag() != null) - { - consumerTagName = body.getConsumerTag().intern(false); - } - else - { - consumerTagName = null; - } - - try - { - if(consumerTagName == null || channel.getSubscription(consumerTagName) == null) - { - - AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, - sources, - !body.getNoAck(), - body.getArguments(), - body.getExclusive(), - body.getNoLocal()); - if (!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - } - } - else - { - AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg, // replytext - body.getClazz(), - body.getMethod()); - _connection.writeFrame(responseBody.generateFrame(0)); - } - - } - catch (AMQInvalidArgumentException ise) - { - _logger.debug("Closing connection due to invalid selector"); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(), - AMQShortString.validValueOf(ise.getMessage()), - body.getClazz(), - body.getMethod()); - _connection.writeFrame(responseBody.generateFrame(channelId)); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicConsume(body.getQueue(), body.getConsumerTag(), + body.getNoLocal(), body.getNoAck(), + body.getExclusive(), body.getNowait(), + body.getArguments()); + } + } + ); - } - catch (AMQQueue.ExistingExclusiveConsumer e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an existing exclusive consumer", - _connection.getMethodRegistry()); - } - catch (AMQQueue.ExistingConsumerPreventsExclusive e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " exclusively as it already has a consumer", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " permission denied", _connection.getMethodRegistry()); - } - catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an incompatible exclusivity policy", - _connection.getMethodRegistry()); - } - - } - } return true; } - public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException + private void closeConnection(final AMQConstant constant, + final String message) { - - VirtualHostImpl vHost = _connection.getVirtualHost(); - - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - else - { - channel.sync(); - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString()); - if (queue == null) - { - _logger.info("No queue for '" + body.getQueue() + "'"); - if(body.getQueue()!=null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "No such queue, '" + body.getQueue() + "'", - _connection.getMethodRegistry()); - } - else - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue name provided, no default queue defined.", - _connection.getMethodRegistry()); - } - } - else - { - - try - { - if (!performGet(queue, _connection, channel, !body.getNoAck())) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); - - - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), _connection.getMethodRegistry()); - } - catch (MessageSource.ExistingExclusiveConsumer e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an exclusive consumer", - _connection.getMethodRegistry()); - } - catch (MessageSource.ExistingConsumerPreventsExclusive e) - { - throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, - "The GET request has been evaluated as an exclusive consumer, " + - "this is likely due to a programming error in the Qpid broker", - _connection.getMethodRegistry()); - } - catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an incompatible exclusivit policy", - _connection.getMethodRegistry()); - } - } - } - return true; + _connection.closeConnection(constant, message, 0); } - public static boolean performGet(final AMQQueue queue, - final AMQProtocolSession session, - final AMQChannel channel, - final boolean acks) - throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused + public boolean dispatchBasicGet(final BasicGetBody body, int channelId) { - final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); - - final GetDeliveryMethod getDeliveryMethod = - new GetDeliveryMethod(singleMessageCredit, session, channel, queue); - final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() - { - - public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) - { - channel.addUnacknowledgedMessage(entry, deliveryTag, null); - } - }; - - ConsumerTarget_0_8 target; - EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, - ConsumerImpl.Option.SEES_REQUEUES); - if(acks) - { - - target = ConsumerTarget_0_8.createAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); - } - else - { - target = ConsumerTarget_0_8.createGetNoAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); - } - - ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); - sub.flush(); - sub.close(); - return(getDeliveryMethod.hasDeliveredMessage()); - - + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicGet(body.getQueue(), body.getNoAck()); + } + } + ); + return true; } - - private static class GetDeliveryMethod implements ClientDeliveryMethod + public boolean dispatchBasicPublish(final BasicPublishBody body, int channelId) { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicPublish(body.getExchange(), body.getRoutingKey(), + body.getMandatory(), body.getImmediate()); + } + } + ); - private final FlowCreditManager _singleMessageCredit; - private final AMQProtocolSession _session; - private final AMQChannel _channel; - private final AMQQueue _queue; - private boolean _deliveredMessage; - - public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, - final AMQProtocolSession session, - final AMQChannel channel, final AMQQueue queue) - { - _singleMessageCredit = singleMessageCredit; - _session = session; - _channel = channel; - _queue = queue; - } - - @Override - public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, - final InstanceProperties props, final long deliveryTag) - { - _singleMessageCredit.useCreditForMessage(message.getSize()); - long size =_session.getProtocolOutputConverter().writeGetOk(message, - props, - _channel.getChannelId(), - deliveryTag, - _queue.getQueueDepthMessages()); - - _deliveredMessage = true; - return size; - } - - public boolean hasDeliveredMessage() - { - return _deliveredMessage; - } - } - - public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Publish received on channel " + channelId); - } - - AMQShortString exchangeName = body.getExchange(); - VirtualHostImpl vHost = _connection.getVirtualHost(); - - // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? - - MessageDestination destination; - - if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName)) - { - destination = vHost.getDefaultDestination(); - } - else - { - destination = vHost.getMessageDestination(exchangeName.toString()); - } - - // if the exchange does not exist we raise a channel exception - if (destination == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name", - _connection.getMethodRegistry()); - } - else - { - // The partially populated BasicDeliver frame plus the received route body - // is stored in the channel. Once the final body frame has been received - // it is routed to the exchange. - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - MessagePublishInfo info = new MessagePublishInfo(body.getExchange(), - body.getImmediate(), - body.getMandatory(), - body.getRoutingKey()); - info.setExchange(exchangeName); - try - { - channel.setPublishFrame(info, destination); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - } return true; } - public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException + public boolean dispatchBasicQos(final BasicQosBody body, int channelId) { - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); - - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicQos(body.getPrefetchSize(), body.getPrefetchCount(), + body.getGlobal()); + } + } + ); return true; } - public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException + public boolean dispatchBasicRecover(final BasicRecoverBody body, int channelId) { - _logger.debug("Recover received on protocol session " + _connection - + " and channel " + channelId); - AMQChannel channel = _connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - channel.resend(); - - // Qpid 0-8 hacks a synchronous -ok onto recover. - // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant - if(_connection.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); - channel.sync(); - _connection.writeFrame(recoverOk.generateFrame(channelId)); + final boolean sync = _connection.getProtocolVersion().equals(ProtocolVersion.v8_0); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicRecover(body.getRequeue(), sync); + } + } + ); return true; } - public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException + public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) throws AMQException { - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting:" + body.getDeliveryTag() + - ": Requeue:" + body.getRequeue() + - " on channel:" + channel.debugIdentity()); - } - - long deliveryTag = body.getDeliveryTag(); - - MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag); - - if (message == null) - { - _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); - } - else - { - - if (message.getMessage() == null) - { - _logger.warn("Message has already been purged, unable to Reject."); - } - else - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + - ": Requeue:" + body.getRequeue() + - " on channel:" + channel.debugIdentity()); - } - - if (body.getRequeue()) - { - //this requeue represents a message rejected from the pre-dispatch queue - //therefore we need to amend the delivery counter. - message.decrementDeliveryCount(); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicReject(body.getDeliveryTag(), body.getRequeue()); + } + } + ); - channel.requeue(deliveryTag); - } - else - { - // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here - // as it would prevent redelivery - // message.reject(); - - final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); - _logger.debug("maxDeliveryCountEnabled: " - + maxDeliveryCountEnabled - + " deliveryTag " - + deliveryTag); - if (maxDeliveryCountEnabled) - { - final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); - _logger.debug("deliveredTooManyTimes: " - + deliveredTooManyTimes - + " deliveryTag " - + deliveryTag); - if (deliveredTooManyTimes) - { - channel.deadLetter(body.getDeliveryTag()); - } - else - { - //this requeue represents a message rejected because of a recover/rollback that we - //are not ready to DLQ. We rely on the reject command to resend from the unacked map - //and therefore need to increment the delivery counter so we cancel out the effect - //of the AMQChannel#resend() decrement. - message.incrementDeliveryCount(); - } - } - else - { - channel.requeue(deliveryTag); - } - } - } - } return true; } @@ -720,7 +264,9 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher // Protect the broker against out of order frame request. if (virtualHost == null) { - throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null); + throw new AMQException(AMQConstant.COMMAND_INVALID, + "Virtualhost has not yet been set. ConnectionOpen has not been called.", + null); } _logger.info("Connecting to: " + virtualHost.getName()); @@ -783,30 +329,17 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException { - if (_logger.isInfoEnabled()) - { - _logger.info("Received channel close for id " + channelId - + " citing class " + body.getClassId() + - " and method " + body.getMethodId()); - } - - - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, - "Trying to close unknown channel", - _connection.getMethodRegistry()); - } - channel.sync(); - _connection.closeChannel(channelId); - // Client requested closure so we don't wait for ok we send it - _connection.closeChannelOk(channelId); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveChannelClose(); + } + } + ); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); return true; } @@ -814,32 +347,34 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException { - _logger.info("Received channel-close-ok for channel-id " + channelId); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveChannelCloseOk(); + } + } + ); - // Let the Protocol Session know the channel is now closed. - _connection.closeChannelOk(channelId); return true; } - public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException + public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) throws AMQException { - final AMQProtocolSession<?> connection = getConnection(); - - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - channel.setSuspended(!body.getActive()); - _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive()); - connection.writeFrame(responseBody.generateFrame(channelId)); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveChannelFlow(body.getActive()); + } + } + ); return true; } @@ -861,7 +396,8 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher String virtualHostName; if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/') { - virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); + virtualHostName = + new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); } else { @@ -872,41 +408,41 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher if (virtualHost == null) { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", - _connection.getMethodRegistry()); + closeConnection(AMQConstant.NOT_FOUND, + "Unknown virtual host: '" + virtualHostName + "'"); + } else { // Check virtualhost access if (virtualHost.getState() != State.ACTIVE) { - throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, - "Virtual host '" + virtualHost.getName() + "' is not active", - _connection.getMethodRegistry()); - } + closeConnection(AMQConstant.CONNECTION_FORCED, + "Virtual host '" + virtualHost.getName() + "' is not active" + ); - _connection.setVirtualHost(virtualHost); - try - { - virtualHost.getSecurityManager().authoriseCreateConnection(_connection); } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (_connection.getContextKey() == null) + else { - _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); - } + _connection.setVirtualHost(virtualHost); + try + { + virtualHost.getSecurityManager().authoriseCreateConnection(_connection); + if (_connection.getContextKey() == null) + { + _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); + } - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); - _connection.writeFrame(responseBody.generateFrame(channelId)); + _connection.writeFrame(responseBody.generateFrame(channelId)); + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); + } + } } return true; } @@ -1067,7 +603,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if(frameMax <= 0) + if (frameMax <= 0) { frameMax = Integer.MAX_VALUE; } @@ -1089,12 +625,12 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - private void disposeSaslServer(AMQProtocolSession ps) + private void disposeSaslServer(AMQProtocolEngine connection) { - SaslServer ss = ps.getSaslServer(); + SaslServer ss = connection.getSaslServer(); if (ss != null) { - ps.setSaslServer(null); + connection.setSaslServer(null); try { ss.dispose(); @@ -1123,60 +659,66 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher if (ss == null) { - throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + body.getMechanism(), - _connection.getMethodRegistry()); + closeConnection(AMQConstant.RESOURCE_ERROR, + "Unable to create SASL Server:" + body.getMechanism() + ); + } + else + { - _connection.setSaslServer(ss); + _connection.setSaslServer(ss); - final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - //save clientProperties - _connection.setClientProperties(body.getClientProperties()); + final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); + //save clientProperties + _connection.setClientProperties(body.getClientProperties()); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); + switch (authResult.getStatus()) + { + case ERROR: + Exception cause = authResult.getCause(); - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); + _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - ConnectionCloseBody closeBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); + ConnectionCloseBody closeBody = + methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), + // replyCode + AMQConstant.NOT_ALLOWED.getName(), + body.getClazz(), + body.getMethod()); - _connection.writeFrame(closeBody.generateFrame(0)); - disposeSaslServer(_connection); - break; + _connection.writeFrame(closeBody.generateFrame(0)); + disposeSaslServer(_connection); + break; - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - _connection.setAuthorizedSubject(authResult.getSubject()); + case SUCCESS: + if (_logger.isInfoEnabled()) + { + _logger.info("Connected as: " + authResult.getSubject()); + } + _connection.setAuthorizedSubject(authResult.getSubject()); - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if(frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } + if (frameMax <= 0) + { + frameMax = Integer.MAX_VALUE; + } - ConnectionTuneBody - tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - break; - case CONTINUE: - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); + ConnectionTuneBody + tuneBody = + methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), + frameMax, + broker.getConnection_heartBeatDelay()); + _connection.writeFrame(tuneBody.generateFrame(0)); + break; + case CONTINUE: + ConnectionSecureBody + secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + _connection.writeFrame(secureBody.generateFrame(0)); + } } } catch (SaslException e) @@ -1189,38 +731,33 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException { - final AMQProtocolSession<?> connection = getConnection(); - - if (_logger.isDebugEnabled()) - { - _logger.debug(body); - } + final AMQProtocolEngine connection = getConnection(); connection.initHeartbeats(body.getHeartbeat()); int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if(brokerFrameMax <= 0) + if (brokerFrameMax <= 0) { brokerFrameMax = Integer.MAX_VALUE; } - if(body.getFrameMax() > (long) brokerFrameMax) + if (body.getFrameMax() > (long) brokerFrameMax) { throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, "Attempt to set max frame size to " + body.getFrameMax() + " greater than the broker will allow: " + brokerFrameMax, body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); + connection.getMethodRegistry(), null); } - else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) + else if (body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) { throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, "Attempt to set max frame size to " + body.getFrameMax() + " which is smaller than the specification definined minimum: " + AMQConstant.FRAME_MIN_SIZE.getCode(), body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); + connection.getMethodRegistry(), null); } int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); connection.setMaxFrameSize(frameMax); @@ -1231,375 +768,60 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public static final int OK = 0; - public static final int EXCHANGE_NOT_FOUND = 1; - public static final int QUEUE_NOT_FOUND = 2; - public static final int NO_BINDINGS = 3; - public static final int QUEUE_NOT_BOUND = 4; - public static final int NO_QUEUE_BOUND_WITH_RK = 5; - public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; - - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException + public boolean dispatchExchangeBound(final ExchangeBoundBody body, int channelId) { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - - - AMQShortString exchangeName = body.getExchange(); - AMQShortString queueName = body.getQueue(); - AMQShortString routingKey = body.getRoutingKey(); - ExchangeBoundOkBody response; - - if(isDefaultExchange(exchangeName)) - { - if(routingKey == null) - { - if(queueName == null) - { - response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null); - } - else - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - response = methodRegistry.createExchangeBoundOkBody(OK, null); - } - } - } - else - { - if(queueName == null) - { - response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null); - } - else - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null); - } - } - } - } - else - { - ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); - if (exchange == null) - { - - - response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, - AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found")); - } - else if (routingKey == null) - { - if (queueName == null) - { - if (exchange.hasBindings()) - { - response = methodRegistry.createExchangeBoundOkBody(OK, null); - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode - null); // replyText - } - } - else - { - - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - if (exchange.isBound(queue)) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText - } - } - } - } - else if (queueName != null) - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); - if (exchange.isBound(bindingKey, queue)) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - String message = "Queue '" + queueName + "' not bound with routing key '" + - body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; - - response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - AMQShortString.validValueOf(message)); // replyText - } - } - } - else - { - if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveExchangeBound(body.getExchange(), body.getQueue(), body.getRoutingKey()); + } + } + ); - response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode - AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() + - "' to exchange '" + exchangeName + "'")); // replyText - } - } - } - _connection.writeFrame(response.generateFrame(channelId)); return true; } - public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException - { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - final AMQShortString exchangeName = body.getExchange(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName); - } - - ExchangeImpl exchange; - - if(isDefaultExchange(exchangeName)) - { - if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType())) - { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " - + " of type " - + ExchangeDefaults.DIRECT_EXCHANGE_CLASS - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - _connection.getMethodRegistry(),null); - } - } - else - { - if (body.getPassive()) - { - exchange = virtualHost.getExchange(exchangeName.toString()); - if(exchange == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName, - _connection.getMethodRegistry()); - } - else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString())) - { - - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + - exchangeName + " of type " + exchange.getType() - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - _connection.getMethodRegistry(),null); - } - - } - else - { - try - { - String name = exchangeName == null ? null : exchangeName.intern().toString(); - String type = body.getType() == null ? null : body.getType().intern().toString(); - - Map<String,Object> attributes = new HashMap<String, Object>(); - if(body.getArguments() != null) - { - attributes.putAll(FieldTable.convertToMap(body.getArguments())); - } - attributes.put(org.apache.qpid.server.model.Exchange.ID, null); - attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) - { - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - } - exchange = virtualHost.createExchange(attributes); - - } - catch(ReservedExchangeNameException e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix.", - _connection.getMethodRegistry()); - - } - catch(ExchangeExistsException e) - { - exchange = e.getExistingExchange(); - if(!new AMQShortString(exchange.getType()).equals(body.getType())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " - + exchange.getType() - + " to " + body.getType() + ".", - _connection.getMethodRegistry()); - } - } - catch(NoFactoryForTypeException e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, - "Unknown exchange type '" - + e.getType() - + "' for exchange '" - + exchangeName - + "'", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - catch (UnknownConfiguredObjectException e) - { - // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "Unknown alternate exchange " - + (e.getName() != null - ? "name: \"" + e.getName() + "\"" - : "id: " + e.getId()), - _connection.getMethodRegistry()); - } - catch (IllegalArgumentException e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, - "Error creating exchange '" - + exchangeName - + "': " - + e.getMessage(), - _connection.getMethodRegistry()); - } - } - } + public boolean dispatchExchangeDeclare(final ExchangeDeclareBody body, int channelId) + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveExchangeDeclare(body.getExchange(), body.getType(), + body.getPassive(), + body.getDurable(), + body.getAutoDelete(), + body.getInternal(), + body.getNowait(), + body.getArguments()); + } + } + ); - if(!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); - channel.sync(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - } return true; } - public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException + public boolean dispatchExchangeDelete(final ExchangeDeleteBody body, int channelId) { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - try - { - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Default Exchange cannot be deleted", - _connection.getMethodRegistry()); - } - - final String exchangeName = body.getExchange().toString(); - - final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); - if(exchange == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(), - _connection.getMethodRegistry()); - } - virtualHost.removeExchange(exchange, !body.getIfUnused()); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveExchangeDelete(body.getExchange(), + body.getIfUnused(), + body.getNowait()); + } + } + ); - ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody(); - - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - - catch (ExchangeIsAlternateException e) - { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", - _connection.getMethodRegistry()); - - } - catch (RequiredExchangeException e) - { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, - "Exchange '" + body.getExchange() + "' cannot be deleted", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } return true; } @@ -1608,545 +830,146 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); } - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException + public boolean dispatchQueueBind(final QueueBindBody body, int channelId) { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueueBind(body.getQueue(), + body.getExchange(), + body.getRoutingKey(), + body.getNowait(), + body.getArguments()); + } + } + ); - final AMQQueue queue; - final AMQShortString routingKey; - - final AMQShortString queueName = body.getQueue(); - - if (queueName == null) - { - - queue = channel.getDefaultQueue(); - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, - "No default queue defined on channel and queue was null", - _connection.getMethodRegistry()); - } - - if (body.getRoutingKey() == null) - { - routingKey = AMQShortString.valueOf(queue.getName()); - } - else - { - routingKey = body.getRoutingKey().intern(); - } - } - else - { - queue = virtualHost.getQueue(queueName.toString()); - routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern(); - } - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.", - _connection.getMethodRegistry()); - } - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Cannot bind the queue " + queueName + " to the default exchange", - _connection.getMethodRegistry()); - } - - final String exchangeName = body.getExchange().toString(); - - final ExchangeImpl exch = virtualHost.getExchange(exchangeName); - if (exch == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.", - _connection.getMethodRegistry()); - } - - - try - { - - Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments()); - String bindingKey = String.valueOf(routingKey); - - if (!exch.isBound(bindingKey, arguments, queue)) - { - - if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType())) - { - exch.replaceBinding(bindingKey, queue, arguments); - } - } - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); - } - if (!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - } return true; } - public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException - { - final AMQSessionModel session = _connection.getChannel(channelId); - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - final AMQShortString queueName; + public boolean dispatchQueueDeclare(final QueueDeclareBody body, int channelId) + { + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueueDeclare(body.getQueue(), + body.getPassive(), + body.getDurable(), + body.getExclusive(), + body.getAutoDelete(), + body.getNowait(), + body.getArguments()); + } + } + ); - // if we aren't given a queue name, we create one which we return to the client - if ((body.getQueue() == null) || (body.getQueue().length() == 0)) - { - queueName = new AMQShortString("tmp_" + UUID.randomUUID()); - } - else - { - queueName = body.getQueue().intern(); - } - - AMQQueue queue; - - //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if(body.getPassive()) - { - queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " - + queue.getName() - + " is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - - //set this as the default queue on the channel: - channel.setDefaultQueue(queue); - } - } - else - { - - try - { - - queue = createQueue(channel, queueName, body, virtualHost, _connection); - - } - catch(QueueExistsException qe) - { - - queue = qe.getExistingQueue(); - - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " - + queue.getName() - + " is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - else if(queue.isExclusive() != body.getExclusive()) - { - - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" - + queue.getName() - + "' with different exclusivity (was: " - + queue.isExclusive() - + " requested " - + body.getExclusive() - + ")", - _connection.getMethodRegistry()); - } - else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) - || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" - + queue.getName() - + "' with different lifetime policy (was: " - + queue.getLifetimePolicy() - + " requested autodelete: " - + body.getAutoDelete() - + ")", - _connection.getMethodRegistry()); - } - else if(queue.isDurable() != body.getDurable()) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" - + queue.getName() - + "' with different durability (was: " - + queue.isDurable() - + " requested " - + body.getDurable() - + ")", - _connection.getMethodRegistry()); - } - - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - //set this as the default queue on the channel: - channel.setDefaultQueue(queue); - } - - if (!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - QueueDeclareOkBody responseBody = - methodRegistry.createQueueDeclareOkBody(queueName, - queue.getQueueDepthMessages(), - queue.getConsumerCount()); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - _logger.info("Queue " + queueName + " declared successfully"); - } return true; } - protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName, - QueueDeclareBody body, - final VirtualHostImpl virtualHost, - final AMQProtocolSession session) - throws AMQException, QueueExistsException - { - - final boolean durable = body.getDurable(); - final boolean autoDelete = body.getAutoDelete(); - final boolean exclusive = body.getExclusive(); - - - Map<String, Object> attributes = - QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); - final String queueNameString = AMQShortString.toString(queueName); - attributes.put(Queue.NAME, queueNameString); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.DURABLE, durable); - - LifetimePolicy lifetimePolicy; - ExclusivityPolicy exclusivityPolicy; - - if(exclusive) - { - lifetimePolicy = autoDelete - ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS - : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; - exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; - } - else - { - lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; - exclusivityPolicy = ExclusivityPolicy.NONE; - } - - attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); - attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); - - - final AMQQueue queue = virtualHost.createQueue(attributes); - - return queue; - } - - public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException + public boolean dispatchQueueDelete(final QueueDeleteBody body, int channelId) { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - AMQQueue queue; - if (body.getQueue() == null) - { - - //get the default queue on the channel: - queue = channel.getDefaultQueue(); - } - else - { - queue = virtualHost.getQueue(body.getQueue().toString()); - } - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", - _connection.getMethodRegistry()); - - } - else - { - if (body.getIfEmpty() && !queue.isEmpty()) - { - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.", - _connection.getMethodRegistry()); - } - else if (body.getIfUnused() && !queue.isUnused()) - { - // TODO - Error code - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.", - _connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " - + queue.getName() - + " is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueueDelete(body.getQueue(), + body.getIfUnused(), + body.getIfEmpty(), + body.getNowait()); + } + } + ); - int purged = 0; - try - { - purged = virtualHost.removeQueue(queue); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - } return true; } - public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException + public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) throws AMQException { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - AMQQueue queue; - if(body.getQueue() == null) - { - - //get the default queue on the channel: - queue = channel.getDefaultQueue(); - if(queue == null) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue specified.", - _connection.getMethodRegistry()); - } - } - else - { - queue = virtualHost.getQueue(body.getQueue().toString()); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueuePurge(body.getQueue(), + body.getNowait()); + } + } + ); - if(queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", - _connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - - long purged = 0; - try - { - purged = queue.clearQueue(); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - - if(!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - } - } return true; } public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException { - try - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Commit received on channel " + channelId); - } - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.commit(new Runnable() - { - - @Override - public void run() - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - }, true); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveTxCommit(); + } + } + ); - - - } - catch (AMQException e) - { - throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(), - _connection.getMethodRegistry()); - } return true; } public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException { - try - { - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - - - final MethodRegistry methodRegistry = _connection.getMethodRegistry(); - final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody(); - - Runnable task = new Runnable() - { - public void run() - { - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - }; - - channel.rollback(task); - - //Now resend all the unacknowledged messages back to the original subscribers. - //(Must be done after the TxnRollback-ok response). - // Why, are we not allowed to send messages back to client before the ok method? - channel.resend(); - - } - catch (AMQException e) - { - throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage(), - _connection.getMethodRegistry()); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveTxRollback(); + } + } + ); return true; } public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException { - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - channel.setLocalTransactional(); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveTxSelect(); + } + } + ); return true; } - public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException + public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) throws AMQException { - final AMQProtocolSession<?> connection = getConnection(); - - _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId); - AMQChannel channel = connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - channel.resend(); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); - connection.writeFrame(recoverOk.generateFrame(channelId)); + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveBasicRecover(body.getRequeue(), true); + } + } + ); return true; } @@ -2168,98 +991,23 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body); } - public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException + public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) throws AMQException { - final AMQProtocolSession<?> connection = getConnection(); - - if (ProtocolVersion.v8_0.equals(connection.getProtocolVersion())) - { - // 0-8 does not support QueueUnbind - throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null); - } - - VirtualHostImpl virtualHost = connection.getVirtualHost(); - - final AMQQueue queue; - final AMQShortString routingKey; - - - AMQChannel channel = connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - if (body.getQueue() == null) - { - - queue = channel.getDefaultQueue(); - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, - "No default queue defined on channel and queue was null", - connection.getMethodRegistry()); - } - - routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false); - - } - else - { - queue = virtualHost.getQueue(body.getQueue().toString()); - routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false); - } - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", - connection.getMethodRegistry()); - } - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Cannot unbind the queue " - + queue.getName() - + " from the default exchange", connection.getMethodRegistry()); - } - - final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); - if (exch == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.", - connection.getMethodRegistry()); - } - - if(!exch.hasBinding(String.valueOf(routingKey), queue)) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No such binding", connection.getMethodRegistry()); - } - else - { - try - { - exch.deleteBinding(String.valueOf(routingKey), queue); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - connection.getMethodRegistry()); - } - } - - - if (_logger.isInfoEnabled()) - { - _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); - } + processChannelMethod(channelId, + new ChannelAction() + { + @Override + public void onChannel(final ChannelMethodProcessor channel) + { + channel.receiveQueueUnbind(body.getQueue(), + body.getExchange(), + body.getRoutingKey(), + body.getArguments()); + } + } + ); - final AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody(); - channel.sync(); - connection.writeFrame(responseBody.generateFrame(channelId)); return true; } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java index 40e1af10c6..a5866bb1f3 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java @@ -37,6 +37,8 @@ import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; public class ServerMethodProcessor implements MethodProcessor { private static final Logger LOGGER = Logger.getLogger(ServerMethodProcessor.class); + private int _classId; + private int _methodId; private static interface ChannelAction @@ -92,8 +94,8 @@ public class ServerMethodProcessor implements MethodProcessor mechanisms, locales)); } - _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unexpected method received: ConnectionStart", 0, - ConnectionStartBody.CLASS_ID, ConnectionStartBody.METHOD_ID); + _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unexpected method received: ConnectionStart", 0 + ); } @@ -121,9 +123,8 @@ public class ServerMethodProcessor implements MethodProcessor if (ss == null) { _connection.closeConnection(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + mechanism, 0, - ConnectionStartOkBody.CLASS_ID, - ConnectionStartOkBody.METHOD_ID); + "Unable to create SASL Server:" + mechanism, 0 + ); } else { @@ -143,9 +144,8 @@ public class ServerMethodProcessor implements MethodProcessor LOGGER.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); _connection.closeConnection(AMQConstant.NOT_ALLOWED, - AMQConstant.NOT_ALLOWED.getName().toString(), 0, - ConnectionStartOkBody.CLASS_ID, - ConnectionStartOkBody.METHOD_ID); + AMQConstant.NOT_ALLOWED.getName().toString(), 0 + ); disposeSaslServer(); break; @@ -182,8 +182,8 @@ public class ServerMethodProcessor implements MethodProcessor { disposeSaslServer(); - _connection.closeConnection(AMQConstant.RESOURCE_ERROR, "SASL error: " + e.getMessage(), 0, - ConnectionStartOkBody.CLASS_ID, ConnectionStartOkBody.METHOD_ID); + _connection.closeConnection(AMQConstant.RESOURCE_ERROR, "SASL error: " + e.getMessage(), 0 + ); } } @@ -938,6 +938,13 @@ public class ServerMethodProcessor implements MethodProcessor } + @Override + public void setCurrentMethod(final int classId, final int methodId) + { + _classId = classId; + _methodId = methodId; + } + private void disposeSaslServer() { SaslServer ss = _connection.getSaslServer(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java index fcbbadd507..bd7b070cd2 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; - import java.util.Collection; import java.util.Set; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; + public interface UnacknowledgedMessageMap { @@ -37,12 +36,12 @@ public interface UnacknowledgedMessageMap *@param message the message being iterated over @return true to stop iteration, false to continue * @throws AMQException */ - boolean callback(final long deliveryTag, MessageInstance message) throws AMQException; + boolean callback(final long deliveryTag, MessageInstance message); void visitComplete(); } - void visit(Visitor visitor) throws AMQException; + void visit(Visitor visitor); void add(long deliveryTag, MessageInstance message); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java index c33af48d8e..2d39daed1c 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.qpid.AMQException; import org.apache.qpid.server.message.MessageInstance; public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap @@ -82,7 +81,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public void visit(Visitor visitor) throws AMQException + public void visit(Visitor visitor) { synchronized (_lock) { |