diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler')
36 files changed, 1924 insertions, 3978 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java deleted file mode 100644 index df66120731..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java +++ /dev/null @@ -1,76 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AccessRequestBody;
-import org.apache.qpid.framing.AccessRequestOkBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-/**
- * @author Apache Software Foundation
- *
- *
- */
-public class AccessRequestHandler implements StateAwareMethodListener<AccessRequestBody>
-{
- private static final AccessRequestHandler _instance = new AccessRequestHandler();
-
-
- public static AccessRequestHandler getInstance()
- {
- return _instance;
- }
-
- private AccessRequestHandler()
- {
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- AccessRequestBody body,
- int channelId) throws AMQException
- {
- final AMQChannel channel = connection.getChannel(channelId);
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
-
- if(ProtocolVersion.v0_91.equals(connection.getProtocolVersion()) )
- {
- throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
- }
-
- // We don't implement access control class, but to keep clients happy that expect it
- // always use the "0" ticket.
- AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
- channel.sync();
- connection.writeFrame(response.generateFrame(channelId));
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java deleted file mode 100644 index efc91800a1..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody> -{ - private static final Logger _log = Logger.getLogger(BasicAckMethodHandler.class); - - private static final BasicAckMethodHandler _instance = new BasicAckMethodHandler(); - - public static BasicAckMethodHandler getInstance() - { - return _instance; - } - - private BasicAckMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - BasicAckBody body, - int channelId) throws AMQException - { - - if (_log.isDebugEnabled()) - { - _log.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId); - } - - final AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - // this method throws an AMQException if the delivery tag is not known - channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple()); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java deleted file mode 100644 index 16498b3e88..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody> -{ - private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class); - - private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler(); - - public static BasicCancelMethodHandler getInstance() - { - return _instance; - } - - private BasicCancelMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - BasicCancelBody body, - int channelId) throws AMQException - { - final AMQChannel channel = connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - if (_log.isDebugEnabled()) - { - _log.debug("BasicCancel: for:" + body.getConsumerTag() + - " nowait:" + body.getNowait()); - } - - channel.unsubscribeConsumer(body.getConsumerTag()); - if (!body.getNowait()) - { - MethodRegistry methodRegistry = connection.getMethodRegistry(); - BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag()); - channel.sync(); - connection.writeFrame(cancelOkBody.generateFrame(channelId)); - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java deleted file mode 100644 index b4219fe29c..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; -import java.util.Collection; -import java.util.HashSet; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.filter.AMQInvalidArgumentException; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody> -{ - private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class); - - private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler(); - - public static BasicConsumeMethodHandler getInstance() - { - return _instance; - } - - private BasicConsumeMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - BasicConsumeBody body, - int channelId) throws AMQException - { - AMQChannel channel = connection.getChannel(channelId); - VirtualHostImpl<?,?,?> vHost = connection.getVirtualHost(); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - else - { - channel.sync(); - String queueName = body.getQueue() == null ? null : body.getQueue().asString(); - if (_logger.isDebugEnabled()) - { - _logger.debug("BasicConsume: from '" + queueName + - "' for:" + body.getConsumerTag() + - " nowait:" + body.getNowait() + - " args:" + body.getArguments()); - } - - MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName); - final Collection<MessageSource> sources = new HashSet<>(); - if(queue != null) - { - sources.add(queue); - } - else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") - && body.getArguments() != null - && body.getArguments().get("x-multiqueue") instanceof Collection) - { - for(Object object : (Collection<Object>)body.getArguments().get("x-multiqueue")) - { - String sourceName = String.valueOf(object); - sourceName = sourceName.trim(); - if(sourceName.length() != 0) - { - MessageSource source = vHost.getMessageSource(sourceName); - if(source == null) - { - sources.clear(); - break; - } - else - { - sources.add(source); - } - } - } - queueName = body.getArguments().get("x-multiqueue").toString(); - } - - if (sources.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("No queue for '" + queueName + "'"); - } - if (queueName != null) - { - String msg = "No such queue, '" + queueName + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry()); - } - else - { - String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, connection.getMethodRegistry()); - } - } - else - { - final AMQShortString consumerTagName; - - if (body.getConsumerTag() != null) - { - consumerTagName = body.getConsumerTag().intern(false); - } - else - { - consumerTagName = null; - } - - try - { - if(consumerTagName == null || channel.getSubscription(consumerTagName) == null) - { - - AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, - sources, - !body.getNoAck(), - body.getArguments(), - body.getExclusive(), - body.getNoLocal()); - if (!body.getNowait()) - { - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); - connection.writeFrame(responseBody.generateFrame(channelId)); - - } - } - else - { - AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg, // replytext - body.getClazz(), - body.getMethod()); - connection.writeFrame(responseBody.generateFrame(0)); - } - - } - catch (AMQInvalidArgumentException ise) - { - _logger.debug("Closing connection due to invalid selector"); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(), - AMQShortString.validValueOf(ise.getMessage()), - body.getClazz(), - body.getMethod()); - connection.writeFrame(responseBody.generateFrame(channelId)); - - - } - catch (AMQQueue.ExistingExclusiveConsumer e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an existing exclusive consumer", connection.getMethodRegistry()); - } - catch (AMQQueue.ExistingConsumerPreventsExclusive e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " exclusively as it already has a consumer", connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " permission denied", connection.getMethodRegistry()); - } - catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an incompatible exclusivity policy", connection.getMethodRegistry()); - } - - } - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java deleted file mode 100644 index d650292546..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; -import java.util.EnumSet; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicGetBody; -import org.apache.qpid.framing.BasicGetEmptyBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; -import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; -import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody> -{ - private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); - - private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler(); - - public static BasicGetMethodHandler getInstance() - { - return _instance; - } - - private BasicGetMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - BasicGetBody body, - int channelId) throws AMQException - { - - VirtualHostImpl vHost = connection.getVirtualHost(); - - AMQChannel channel = connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - else - { - channel.sync(); - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString()); - if (queue == null) - { - _log.info("No queue for '" + body.getQueue() + "'"); - if(body.getQueue()!=null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "No such queue, '" + body.getQueue()+ "'", connection.getMethodRegistry()); - } - else - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue name provided, no default queue defined.", connection.getMethodRegistry()); - } - } - else - { - - try - { - if (!performGet(queue,connection, channel, !body.getNoAck())) - { - MethodRegistry methodRegistry = connection.getMethodRegistry(); - // TODO - set clusterId - BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); - - - connection.writeFrame(responseBody.generateFrame(channelId)); - } - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), connection.getMethodRegistry()); - } - catch (MessageSource.ExistingExclusiveConsumer e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an exclusive consumer", connection.getMethodRegistry()); - } - catch (MessageSource.ExistingConsumerPreventsExclusive e) - { - throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, - "The GET request has been evaluated as an exclusive consumer, " + - "this is likely due to a programming error in the Qpid broker", - connection.getMethodRegistry()); - } - catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an incompatible exclusivit policy", connection.getMethodRegistry()); - } - } - } - } - - public static boolean performGet(final AMQQueue queue, - final AMQProtocolSession session, - final AMQChannel channel, - final boolean acks) - throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused - { - - final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); - - final GetDeliveryMethod getDeliveryMethod = - new GetDeliveryMethod(singleMessageCredit, session, channel, queue); - final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() - { - - public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) - { - channel.addUnacknowledgedMessage(entry, deliveryTag, null); - } - }; - - ConsumerTarget_0_8 target; - EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, - ConsumerImpl.Option.SEES_REQUEUES); - if(acks) - { - - target = ConsumerTarget_0_8.createAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); - } - else - { - target = ConsumerTarget_0_8.createGetNoAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); - } - - ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); - sub.flush(); - sub.close(); - return(getDeliveryMethod.hasDeliveredMessage()); - - - } - - - private static class GetDeliveryMethod implements ClientDeliveryMethod - { - - private final FlowCreditManager _singleMessageCredit; - private final AMQProtocolSession _session; - private final AMQChannel _channel; - private final AMQQueue _queue; - private boolean _deliveredMessage; - - public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, - final AMQProtocolSession session, - final AMQChannel channel, final AMQQueue queue) - { - _singleMessageCredit = singleMessageCredit; - _session = session; - _channel = channel; - _queue = queue; - } - - @Override - public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, - final InstanceProperties props, final long deliveryTag) - { - _singleMessageCredit.useCreditForMessage(message.getSize()); - long size =_session.getProtocolOutputConverter().writeGetOk(message, - props, - _channel.getChannelId(), - deliveryTag, - _queue.getQueueDepthMessages()); - - _deliveredMessage = true; - return size; - } - - public boolean hasDeliveredMessage() - { - return _deliveredMessage; - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java deleted file mode 100644 index 41f727c7d4..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.MessagePublishInfo; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.message.MessageDestination; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody> -{ - private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class); - - private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler(); - - - public static BasicPublishMethodHandler getInstance() - { - return _instance; - } - - private BasicPublishMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - BasicPublishBody body, - int channelId) throws AMQException - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Publish received on channel " + channelId); - } - - AMQShortString exchangeName = body.getExchange(); - VirtualHostImpl vHost = connection.getVirtualHost(); - - // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? - - MessageDestination destination; - - if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName)) - { - destination = vHost.getDefaultDestination(); - } - else - { - destination = vHost.getMessageDestination(exchangeName.toString()); - } - - // if the exchange does not exist we raise a channel exception - if (destination == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name", - connection.getMethodRegistry()); - } - else - { - // The partially populated BasicDeliver frame plus the received route body - // is stored in the channel. Once the final body frame has been received - // it is routed to the exchange. - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - MessagePublishInfo info = new MessagePublishInfo(body.getExchange(), - body.getImmediate(), - body.getMandatory(), - body.getRoutingKey()); - info.setExchange(exchangeName); - try - { - channel.setPublishFrame(info, destination); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - connection.getMethodRegistry()); - } - } - } - -} - - - diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java deleted file mode 100644 index 9464be4c6e..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> -{ - private static final BasicQosHandler _instance = new BasicQosHandler(); - - public static BasicQosHandler getInstance() - { - return _instance; - } - - public void methodReceived(final AMQProtocolSession<?> connection, - BasicQosBody body, - int channelId) throws AMQException - { - AMQChannel channel = connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); - - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); - connection.writeFrame(responseBody.generateFrame(channelId)); - - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java deleted file mode 100644 index 29ddf4421a..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody> -{ - private static final Logger _logger = Logger.getLogger(BasicRecoverMethodHandler.class); - - private static final BasicRecoverMethodHandler _instance = new BasicRecoverMethodHandler(); - - public static BasicRecoverMethodHandler getInstance() - { - return _instance; - } - - public void methodReceived(final AMQProtocolSession<?> connection, - BasicRecoverBody body, - int channelId) throws AMQException - { - _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId); - AMQChannel channel = connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - channel.resend(); - - // Qpid 0-8 hacks a synchronous -ok onto recover. - // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant - if(connection.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); - channel.sync(); - connection.writeFrame(recoverOk.generateFrame(channelId)); - - } - - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java deleted file mode 100644 index b75492a65d..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
-{
- private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class);
-
- private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler();
-
- public static BasicRecoverSyncMethodHandler getInstance()
- {
- return _instance;
- }
-
- public void methodReceived(final AMQProtocolSession<?> connection,
- BasicRecoverSyncBody body,
- int channelId) throws AMQException
- {
-
- _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
- AMQChannel channel = connection.getChannel(channelId);
-
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
- }
- channel.sync();
- channel.resend();
-
- MethodRegistry methodRegistry = connection.getMethodRegistry();
- AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- connection.writeFrame(recoverOk.generateFrame(channelId));
-
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java deleted file mode 100644 index 1f299893f9..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicRejectBody; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody> -{ - private static final Logger _logger = Logger.getLogger(BasicRejectMethodHandler.class); - - private static BasicRejectMethodHandler _instance = new BasicRejectMethodHandler(); - - public static BasicRejectMethodHandler getInstance() - { - return _instance; - } - - private BasicRejectMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - BasicRejectBody body, - int channelId) throws AMQException - { - - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting:" + body.getDeliveryTag() + - ": Requeue:" + body.getRequeue() + - " on channel:" + channel.debugIdentity()); - } - - long deliveryTag = body.getDeliveryTag(); - - MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag); - - if (message == null) - { - _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); - } - else - { - - if (message.getMessage() == null) - { - _logger.warn("Message has already been purged, unable to Reject."); - return; - } - - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + - ": Requeue:" + body.getRequeue() + - " on channel:" + channel.debugIdentity()); - } - - if (body.getRequeue()) - { - //this requeue represents a message rejected from the pre-dispatch queue - //therefore we need to amend the delivery counter. - message.decrementDeliveryCount(); - - channel.requeue(deliveryTag); - } - else - { - // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here - // as it would prevent redelivery - // message.reject(); - - final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); - _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag); - if (maxDeliveryCountEnabled) - { - final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); - _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag); - if (deliveredTooManyTimes) - { - channel.deadLetter(body.getDeliveryTag()); - } - else - { - //this requeue represents a message rejected because of a recover/rollback that we - //are not ready to DLQ. We rely on the reject command to resend from the unacked map - //and therefore need to increment the delivery counter so we cancel out the effect - //of the AMQChannel#resend() decrement. - message.incrementDeliveryCount(); - } - } - else - { - channel.requeue(deliveryTag); - } - } - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java deleted file mode 100644 index a9e52c5240..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody> -{ - private static final Logger _logger = Logger.getLogger(ChannelCloseHandler.class); - - private static ChannelCloseHandler _instance = new ChannelCloseHandler(); - - public static ChannelCloseHandler getInstance() - { - return _instance; - } - - private ChannelCloseHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ChannelCloseBody body, - int channelId) throws AMQException - { - - if (_logger.isInfoEnabled()) - { - _logger.info("Received channel close for id " + channelId + " citing class " + body.getClassId() + - " and method " + body.getMethodId()); - } - - - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel", connection.getMethodRegistry()); - } - channel.sync(); - connection.closeChannel(channelId); - // Client requested closure so we don't wait for ok we send it - connection.closeChannelOk(channelId); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody(); - connection.writeFrame(responseBody.generateFrame(channelId)); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java deleted file mode 100644 index fe9d20e151..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody> -{ - private static final Logger _logger = Logger.getLogger(ChannelCloseOkHandler.class); - - private static ChannelCloseOkHandler _instance = new ChannelCloseOkHandler(); - - public static ChannelCloseOkHandler getInstance() - { - return _instance; - } - - private ChannelCloseOkHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ChannelCloseOkBody body, - int channelId) throws AMQException - { - - _logger.info("Received channel-close-ok for channel-id " + channelId); - - // Let the Protocol Session know the channel is now closed. - connection.closeChannelOk(channelId); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java deleted file mode 100644 index 99c0e3b2de..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody> -{ - private static final Logger _logger = Logger.getLogger(ChannelFlowHandler.class); - - private static ChannelFlowHandler _instance = new ChannelFlowHandler(); - - public static ChannelFlowHandler getInstance() - { - return _instance; - } - - private ChannelFlowHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ChannelFlowBody body, - int channelId) throws AMQException - { - - - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - channel.setSuspended(!body.getActive()); - _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive()); - connection.writeFrame(responseBody.generateFrame(channelId)); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java deleted file mode 100644 index cb1a59ba2a..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody> -{ - private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class); - - private static ChannelOpenHandler _instance = new ChannelOpenHandler(); - - public static ChannelOpenHandler getInstance() - { - return _instance; - } - - private ChannelOpenHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ChannelOpenBody body, - int channelId) throws AMQException - { - VirtualHostImpl virtualHost = connection.getVirtualHost(); - - // Protect the broker against out of order frame request. - if (virtualHost == null) - { - throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null); - } - _logger.info("Connecting to: " + virtualHost.getName()); - - final AMQChannel channel = new AMQChannel(connection,channelId, virtualHost.getMessageStore()); - - connection.addChannel(channel); - - ChannelOpenOkBody response; - - - response = connection.getMethodRegistry().createChannelOpenOkBody(); - - - - connection.writeFrame(response.generateFrame(channelId)); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java deleted file mode 100644 index c4a8eb4acb..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> -{ - private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class); - - private static ConnectionCloseMethodHandler _instance = new ConnectionCloseMethodHandler(); - - public static ConnectionCloseMethodHandler getInstance() - { - return _instance; - } - - private ConnectionCloseMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ConnectionCloseBody body, - int channelId) throws AMQException - { - if (_logger.isInfoEnabled()) - { - _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + - body.getReplyText() + " for " + connection); - } - try - { - connection.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); - connection.writeFrame(responseBody.generateFrame(channelId)); - - connection.closeProtocolSession(); - - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java deleted file mode 100644 index 03c43cc80a..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody> -{ - private static final Logger _logger = Logger.getLogger(ConnectionCloseOkMethodHandler.class); - - private static ConnectionCloseOkMethodHandler _instance = new ConnectionCloseOkMethodHandler(); - - public static ConnectionCloseOkMethodHandler getInstance() - { - return _instance; - } - - private ConnectionCloseOkMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ConnectionCloseOkBody body, - int channelId) throws AMQException - { - //todo should this not do more than just log the method? - _logger.info("Received Connection-close-ok"); - - try - { - connection.changeState(AMQState.CONNECTION_CLOSED); - connection.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java deleted file mode 100644 index 20c5e90f5d..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody> -{ - private static final Logger _logger = Logger.getLogger(ConnectionOpenMethodHandler.class); - - private static ConnectionOpenMethodHandler _instance = new ConnectionOpenMethodHandler(); - - public static ConnectionOpenMethodHandler getInstance() - { - return _instance; - } - - private ConnectionOpenMethodHandler() - { - } - - private static AMQShortString generateClientID() - { - return new AMQShortString(Long.toString(System.currentTimeMillis())); - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ConnectionOpenBody body, - int channelId) throws AMQException - { - - //ignore leading '/' - String virtualHostName; - if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/') - { - virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); - } - else - { - virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost()); - } - - VirtualHostImpl virtualHost = ((AmqpPort)connection.getPort()).getVirtualHost(virtualHostName); - - if (virtualHost == null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", - connection.getMethodRegistry()); - } - else - { - // Check virtualhost access - if (virtualHost.getState() != State.ACTIVE) - { - throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active", - connection.getMethodRegistry()); - } - - connection.setVirtualHost(virtualHost); - try - { - virtualHost.getSecurityManager().authoriseCreateConnection(connection); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); - } - - // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (connection.getContextKey() == null) - { - connection.setContextKey(generateClientID()); - } - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); - - connection.changeState(AMQState.CONNECTION_OPEN); - - connection.writeFrame(responseBody.generateFrame(channelId)); - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java deleted file mode 100644 index 001719759a..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - - -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; - -public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener<ConnectionSecureOkBody> -{ - private static final Logger _logger = Logger.getLogger(ConnectionSecureOkMethodHandler.class); - - private static ConnectionSecureOkMethodHandler _instance = new ConnectionSecureOkMethodHandler(); - - public static ConnectionSecureOkMethodHandler getInstance() - { - return _instance; - } - - private ConnectionSecureOkMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ConnectionSecureOkBody body, - int channelId) throws AMQException - { - Broker<?> broker = connection.getBroker(); - - SubjectCreator subjectCreator = connection.getSubjectCreator(); - - SaslServer ss = connection.getSaslServer(); - if (ss == null) - { - throw new AMQException("No SASL context set up in session"); - } - MethodRegistry methodRegistry = connection.getMethodRegistry(); - SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); - - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - // This should be abstracted - connection.changeState(AMQState.CONNECTION_CLOSING); - - ConnectionCloseBody connectionCloseBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - connection.writeFrame(connectionCloseBody.generateFrame(0)); - disposeSaslServer(connection); - break; - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - connection.changeState(AMQState.CONNECTION_NOT_TUNED); - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if(frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } - - ConnectionTuneBody tuneBody = - methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - connection.writeFrame(tuneBody.generateFrame(0)); - connection.setAuthorizedSubject(authResult.getSubject()); - disposeSaslServer(connection); - break; - case CONTINUE: - connection.changeState(AMQState.CONNECTION_NOT_AUTH); - - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - connection.writeFrame(secureBody.generateFrame(0)); - } - } - - private void disposeSaslServer(AMQProtocolSession ps) - { - SaslServer ss = ps.getSaslServer(); - if (ss != null) - { - ps.setSaslServer(null); - try - { - ss.dispose(); - } - catch (SaslException e) - { - _logger.error("Error disposing of Sasl server: " + e); - } - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java deleted file mode 100644 index 328c03bf74..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; - - -public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<ConnectionStartOkBody> -{ - private static final Logger _logger = Logger.getLogger(ConnectionStartOkMethodHandler.class); - - private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler(); - - public static ConnectionStartOkMethodHandler getInstance() - { - return _instance; - } - - private ConnectionStartOkMethodHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ConnectionStartOkBody body, - int channelId) throws AMQException - { - Broker<?> broker = connection.getBroker(); - - _logger.info("SASL Mechanism selected: " + body.getMechanism()); - _logger.info("Locale selected: " + body.getLocale()); - - SubjectCreator subjectCreator = connection.getSubjectCreator(); - SaslServer ss = null; - try - { - ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), - connection.getLocalFQDN(), - connection.getPeerPrincipal()); - - if (ss == null) - { - throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism(), - connection.getMethodRegistry()); - } - - connection.setSaslServer(ss); - - final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - //save clientProperties - connection.setClientProperties(body.getClientProperties()); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); - - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - connection.changeState(AMQState.CONNECTION_CLOSING); - - ConnectionCloseBody closeBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - connection.writeFrame(closeBody.generateFrame(0)); - disposeSaslServer(connection); - break; - - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - connection.setAuthorizedSubject(authResult.getSubject()); - - connection.changeState(AMQState.CONNECTION_NOT_TUNED); - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if(frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } - - ConnectionTuneBody - tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - connection.writeFrame(tuneBody.generateFrame(0)); - break; - case CONTINUE: - connection.changeState(AMQState.CONNECTION_NOT_AUTH); - - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - connection.writeFrame(secureBody.generateFrame(0)); - } - } - catch (SaslException e) - { - disposeSaslServer(connection); - throw new AMQException("SASL error: " + e, e); - } - } - - private void disposeSaslServer(AMQProtocolSession ps) - { - SaslServer ss = ps.getSaslServer(); - if (ss != null) - { - ps.setSaslServer(null); - try - { - ss.dispose(); - } - catch (SaslException e) - { - _logger.error("Error disposing of Sasl server: " + e); - } - } - } - -} - - - diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java deleted file mode 100644 index d5f066063d..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody> -{ - private static final Logger _logger = Logger.getLogger(ConnectionTuneOkMethodHandler.class); - - private static ConnectionTuneOkMethodHandler _instance = new ConnectionTuneOkMethodHandler(); - - public static ConnectionTuneOkMethodHandler getInstance() - { - return _instance; - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ConnectionTuneOkBody body, - int channelId) throws AMQException - { - - if (_logger.isDebugEnabled()) - { - _logger.debug(body); - } - connection.changeState(AMQState.CONNECTION_NOT_OPENED); - - connection.initHeartbeats(body.getHeartbeat()); - - int brokerFrameMax = connection.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE); - if(brokerFrameMax <= 0) - { - brokerFrameMax = Integer.MAX_VALUE; - } - - if(body.getFrameMax() > (long) brokerFrameMax) - { - throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, - "Attempt to set max frame size to " + body.getFrameMax() - + "greater than the broker will allow: " - + brokerFrameMax, - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); - } - else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) - { - throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, - "Attempt to set max frame size to " + body.getFrameMax() - + "which is smaller than the specification definined minimum: " - + AMQConstant.FRAME_MIN_SIZE.getCode(), - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); - } - int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); - connection.setMaxFrameSize(frameMax); - - long maxChannelNumber = body.getChannelMax(); - //0 means no implied limit, except that forced by protocol limitations (0xFFFF) - connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java deleted file mode 100644 index 22e377c219..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ExchangeBoundBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -/** - * @author Apache Software Foundation - * - * - */ -public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody> -{ - private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler(); - - public static final int OK = 0; - - public static final int EXCHANGE_NOT_FOUND = 1; - - public static final int QUEUE_NOT_FOUND = 2; - - public static final int NO_BINDINGS = 3; - - public static final int QUEUE_NOT_BOUND = 4; - - public static final int NO_QUEUE_BOUND_WITH_RK = 5; - - public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; - - public static ExchangeBoundHandler getInstance() - { - return _instance; - } - - private ExchangeBoundHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ExchangeBoundBody body, - int channelId) throws AMQException - { - VirtualHostImpl virtualHost = connection.getVirtualHost(); - MethodRegistry methodRegistry = connection.getMethodRegistry(); - - final AMQChannel channel = connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - - - AMQShortString exchangeName = body.getExchange(); - AMQShortString queueName = body.getQueue(); - AMQShortString routingKey = body.getRoutingKey(); - ExchangeBoundOkBody response; - - if(isDefaultExchange(exchangeName)) - { - if(routingKey == null) - { - if(queueName == null) - { - response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null); - } - else - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - response = methodRegistry.createExchangeBoundOkBody(OK, null); - } - } - } - else - { - if(queueName == null) - { - response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null); - } - else - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null); - } - } - } - } - else - { - ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); - if (exchange == null) - { - - - response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, - AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found")); - } - else if (routingKey == null) - { - if (queueName == null) - { - if (exchange.hasBindings()) - { - response = methodRegistry.createExchangeBoundOkBody(OK, null); - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode - null); // replyText - } - } - else - { - - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - if (exchange.isBound(queue)) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText - } - } - } - } - else if (queueName != null) - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); - if (exchange.isBound(bindingKey, queue)) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - String message = "Queue '" + queueName + "' not bound with routing key '" + - body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; - - response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - AMQShortString.validValueOf(message)); // replyText - } - } - } - else - { - if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode - AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() + - "' to exchange '" + exchangeName + "'")); // replyText - } - } - } - connection.writeFrame(response.generateFrame(channelId)); - } - - protected boolean isDefaultExchange(final AMQShortString exchangeName) - { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); - } - -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java deleted file mode 100644 index f90f47d77c..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.NoFactoryForTypeException; -import org.apache.qpid.server.model.UnknownConfiguredObjectException; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.ExchangeExistsException; -import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody> -{ - private static final Logger _logger = Logger.getLogger(ExchangeDeclareHandler.class); - - private static final ExchangeDeclareHandler _instance = new ExchangeDeclareHandler(); - - public static ExchangeDeclareHandler getInstance() - { - return _instance; - } - - private ExchangeDeclareHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ExchangeDeclareBody body, - int channelId) throws AMQException - { - VirtualHostImpl virtualHost = connection.getVirtualHost(); - final AMQChannel channel = connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - final AMQShortString exchangeName = body.getExchange(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName); - } - - ExchangeImpl exchange; - - if(isDefaultExchange(exchangeName)) - { - if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType())) - { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " - + " of type " - + ExchangeDefaults.DIRECT_EXCHANGE_CLASS - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); - } - } - else - { - if (body.getPassive()) - { - exchange = virtualHost.getExchange(exchangeName.toString()); - if(exchange == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName, - connection.getMethodRegistry()); - } - else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString())) - { - - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + - exchangeName + " of type " + exchange.getType() - + " to " + body.getType() +".",body.getClazz(), body.getMethod(),connection.getMethodRegistry(),null); - } - - } - else - { - try - { - String name = exchangeName == null ? null : exchangeName.intern().toString(); - String type = body.getType() == null ? null : body.getType().intern().toString(); - - Map<String,Object> attributes = new HashMap<String, Object>(); - if(body.getArguments() != null) - { - attributes.putAll(FieldTable.convertToMap(body.getArguments())); - } - attributes.put(org.apache.qpid.server.model.Exchange.ID, null); - attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) - { - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - } - exchange = virtualHost.createExchange(attributes); - - } - catch(ReservedExchangeNameException e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix.", connection.getMethodRegistry()); - - } - catch(ExchangeExistsException e) - { - exchange = e.getExistingExchange(); - if(!new AMQShortString(exchange.getType()).equals(body.getType())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " - + exchange.getType() - + " to " + body.getType() + ".", - connection.getMethodRegistry()); - } - } - catch(NoFactoryForTypeException e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"+e.getType()+"' for exchange '" + exchangeName + "'", connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); - } - catch (UnknownConfiguredObjectException e) - { - // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "Unknown alternate exchange " - + (e.getName() != null - ? "name: \"" + e.getName() + "\"" - : "id: " + e.getId()), - connection.getMethodRegistry()); - } - catch (IllegalArgumentException e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange '"+exchangeName+"': " + e.getMessage(),connection.getMethodRegistry()); - } - } - } - - if(!body.getNowait()) - { - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); - channel.sync(); - connection.writeFrame(responseBody.generateFrame(channelId)); - } - } - - protected boolean isDefaultExchange(final AMQShortString exchangeName) - { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java deleted file mode 100644 index b5c10c190e..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; -import org.apache.qpid.server.virtualhost.RequiredExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody> -{ - private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler(); - - public static ExchangeDeleteHandler getInstance() - { - return _instance; - } - - private ExchangeDeleteHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - ExchangeDeleteBody body, - int channelId) throws AMQException - { - VirtualHostImpl virtualHost = connection.getVirtualHost(); - final AMQChannel channel = connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - try - { - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted", connection.getMethodRegistry()); - } - - final String exchangeName = body.getExchange().toString(); - - final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); - if(exchange == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(), - connection.getMethodRegistry()); - } - - virtualHost.removeExchange(exchange, !body.getIfUnused()); - - ExchangeDeleteOkBody responseBody = connection.getMethodRegistry().createExchangeDeleteOkBody(); - - connection.writeFrame(responseBody.generateFrame(channelId)); - } - - catch (ExchangeIsAlternateException e) - { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", - connection.getMethodRegistry()); - - } - catch (RequiredExchangeException e) - { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted", - connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); - } - } - - - protected boolean isDefaultExchange(final AMQShortString exchangeName) - { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/OnCurrentThreadExecutor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/OnCurrentThreadExecutor.java deleted file mode 100644 index 6ff511ea30..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/OnCurrentThreadExecutor.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.util.concurrent.Executor; - -/** - * An executor that executes the task on the current thread. - */ -public class OnCurrentThreadExecutor implements Executor -{ - public void execute(Runnable command) - { - command.run(); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java deleted file mode 100644 index c47a4b528f..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; -import java.util.Map; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> -{ - private static final Logger _log = Logger.getLogger(QueueBindHandler.class); - - private static final QueueBindHandler _instance = new QueueBindHandler(); - - public static QueueBindHandler getInstance() - { - return _instance; - } - - private QueueBindHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - QueueBindBody body, - int channelId) throws AMQException - { - VirtualHostImpl virtualHost = connection.getVirtualHost(); - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - final AMQQueue queue; - final AMQShortString routingKey; - - final AMQShortString queueName = body.getQueue(); - - if (queueName == null) - { - - queue = channel.getDefaultQueue(); - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null", - connection.getMethodRegistry()); - } - - if (body.getRoutingKey() == null) - { - routingKey = AMQShortString.valueOf(queue.getName()); - } - else - { - routingKey = body.getRoutingKey().intern(); - } - } - else - { - queue = virtualHost.getQueue(queueName.toString()); - routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern(); - } - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.", - connection.getMethodRegistry()); - } - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange", - connection.getMethodRegistry()); - } - - final String exchangeName = body.getExchange().toString(); - - final ExchangeImpl exch = virtualHost.getExchange(exchangeName); - if (exch == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.", - connection.getMethodRegistry()); - } - - - try - { - - Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments()); - String bindingKey = String.valueOf(routingKey); - - if (!exch.isBound(bindingKey, arguments, queue)) - { - - if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType())) - { - exch.replaceBinding(bindingKey, queue, arguments); - } - } - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); - } - - if (_log.isInfoEnabled()) - { - _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); - } - if (!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - connection.writeFrame(responseBody.generateFrame(channelId)); - - } - } - - protected boolean isDefaultExchange(final AMQShortString exchangeName) - { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java deleted file mode 100644 index 0e1016c319..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; -import java.util.Map; -import java.util.UUID; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.virtualhost.QueueExistsException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> -{ - private static final Logger _logger = Logger.getLogger(QueueDeclareHandler.class); - - private static final QueueDeclareHandler _instance = new QueueDeclareHandler(); - - public static QueueDeclareHandler getInstance() - { - return _instance; - } - - public void methodReceived(final AMQProtocolSession<?> connection, - QueueDeclareBody body, - int channelId) throws AMQException - { - final AMQSessionModel session = connection.getChannel(channelId); - VirtualHostImpl virtualHost = connection.getVirtualHost(); - - final AMQShortString queueName; - - // if we aren't given a queue name, we create one which we return to the client - if ((body.getQueue() == null) || (body.getQueue().length() == 0)) - { - queueName = createName(); - } - else - { - queueName = body.getQueue().intern(); - } - - AMQQueue queue; - - //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - if(body.getPassive()) - { - queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", - connection.getMethodRegistry()); - } - - //set this as the default queue on the channel: - channel.setDefaultQueue(queue); - } - } - else - { - - try - { - - queue = createQueue(channel, queueName, body, virtualHost, connection); - - } - catch(QueueExistsException qe) - { - - queue = qe.getExistingQueue(); - - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", - connection.getMethodRegistry()); - } - else if(queue.isExclusive() != body.getExclusive()) - { - - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " - + queue.isExclusive() + " requested " + body.getExclusive() + ")", - connection.getMethodRegistry()); - } - else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) - || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: " - + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")", - connection.getMethodRegistry()); - } - else if(queue.isDurable() != body.getDurable()) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getName() + "' with different durability (was: " - + queue.isDurable() + " requested " + body.getDurable() + ")", - connection.getMethodRegistry()); - } - - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); - } - - //set this as the default queue on the channel: - channel.setDefaultQueue(queue); - } - - if (!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = connection.getMethodRegistry(); - QueueDeclareOkBody responseBody = - methodRegistry.createQueueDeclareOkBody(queueName, - queue.getQueueDepthMessages(), - queue.getConsumerCount()); - connection.writeFrame(responseBody.generateFrame(channelId)); - - _logger.info("Queue " + queueName + " declared successfully"); - } - } - - protected AMQShortString createName() - { - return new AMQShortString("tmp_" + UUID.randomUUID()); - } - - protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName, - QueueDeclareBody body, - final VirtualHostImpl virtualHost, - final AMQProtocolSession session) - throws AMQException, QueueExistsException - { - - final boolean durable = body.getDurable(); - final boolean autoDelete = body.getAutoDelete(); - final boolean exclusive = body.getExclusive(); - - - Map<String, Object> attributes = - QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); - final String queueNameString = AMQShortString.toString(queueName); - attributes.put(Queue.NAME, queueNameString); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.DURABLE, durable); - - LifetimePolicy lifetimePolicy; - ExclusivityPolicy exclusivityPolicy; - - if(exclusive) - { - lifetimePolicy = autoDelete - ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS - : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; - exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; - } - else - { - lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; - exclusivityPolicy = ExclusivityPolicy.NONE; - } - - attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); - attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); - - - final AMQQueue queue = virtualHost.createQueue(attributes); - - return queue; - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java deleted file mode 100644 index 123c076a25..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> -{ - private static final QueueDeleteHandler _instance = new QueueDeleteHandler(); - - public static QueueDeleteHandler getInstance() - { - return _instance; - } - - private final boolean _failIfNotFound; - - public QueueDeleteHandler() - { - this(true); - } - - public QueueDeleteHandler(boolean failIfNotFound) - { - _failIfNotFound = failIfNotFound; - - } - - public void methodReceived(final AMQProtocolSession<?> connection, - QueueDeleteBody body, - int channelId) throws AMQException - { - VirtualHostImpl virtualHost = connection.getVirtualHost(); - - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - AMQQueue queue; - if (body.getQueue() == null) - { - - //get the default queue on the channel: - queue = channel.getDefaultQueue(); - } - else - { - queue = virtualHost.getQueue(body.getQueue().toString()); - } - - if (queue == null) - { - if (_failIfNotFound) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", - connection.getMethodRegistry()); - } - } - else - { - if (body.getIfEmpty() && !queue.isEmpty()) - { - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.", - connection.getMethodRegistry()); - } - else if (body.getIfUnused() && !queue.isUnused()) - { - // TODO - Error code - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.", - connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", - connection.getMethodRegistry()); - } - - int purged = 0; - try - { - purged = virtualHost.removeQueue(queue); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); - } - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); - connection.writeFrame(responseBody.generateFrame(channelId)); - } - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java deleted file mode 100644 index 2c06fef1e2..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.QueuePurgeBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody> -{ - private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); - - public static QueuePurgeHandler getInstance() - { - return _instance; - } - - private final boolean _failIfNotFound; - - public QueuePurgeHandler() - { - this(true); - } - - public QueuePurgeHandler(boolean failIfNotFound) - { - _failIfNotFound = failIfNotFound; - } - - public void methodReceived(final AMQProtocolSession<?> connection, - QueuePurgeBody body, - int channelId) throws AMQException - { - VirtualHostImpl virtualHost = connection.getVirtualHost(); - - AMQChannel channel = connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - AMQQueue queue; - if(body.getQueue() == null) - { - - //get the default queue on the channel: - queue = channel.getDefaultQueue(); - - if(queue == null) - { - if(_failIfNotFound) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.", connection.getMethodRegistry()); - } - } - } - else - { - queue = virtualHost.getQueue(body.getQueue().toString()); - } - - if(queue == null) - { - if(_failIfNotFound) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", - connection.getMethodRegistry()); - } - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue is exclusive, but not created on this Connection.", connection.getMethodRegistry()); - } - - long purged = 0; - try - { - purged = queue.clearQueue(); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); - } - - - if(!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); - connection.writeFrame(responseBody.generateFrame(channelId)); - - } - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java deleted file mode 100644 index 1b2d3c0653..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import java.security.AccessControlException; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.QueueUnbindBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody> -{ - private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class); - - private static final QueueUnbindHandler _instance = new QueueUnbindHandler(); - - public static QueueUnbindHandler getInstance() - { - return _instance; - } - - private QueueUnbindHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - QueueUnbindBody body, - int channelId) throws AMQException - { - - if (ProtocolVersion.v8_0.equals(connection.getProtocolVersion())) - { - // 0-8 does not support QueueUnbind - throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null); - } - - VirtualHostImpl virtualHost = connection.getVirtualHost(); - - final AMQQueue queue; - final AMQShortString routingKey; - - - AMQChannel channel = connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - if (body.getQueue() == null) - { - - queue = channel.getDefaultQueue(); - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null", - connection.getMethodRegistry()); - } - - routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false); - - } - else - { - queue = virtualHost.getQueue(body.getQueue().toString()); - routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false); - } - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", - connection.getMethodRegistry()); - } - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Cannot unbind the queue " - + queue.getName() - + " from the default exchange", connection.getMethodRegistry()); - } - - final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); - if (exch == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.", - connection.getMethodRegistry()); - } - - if(!exch.hasBinding(String.valueOf(routingKey), queue)) - { - throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding", connection.getMethodRegistry()); - } - else - { - try - { - exch.deleteBinding(String.valueOf(routingKey), queue); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); - } - } - - - if (_log.isInfoEnabled()) - { - _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); - } - - - final AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody(); - channel.sync(); - connection.writeFrame(responseBody.generateFrame(channelId)); - } - - protected boolean isDefaultExchange(final AMQShortString exchangeName) - { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); - } - -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java index ce735306ee..ada1b43ad2 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java @@ -20,92 +20,71 @@ */
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.UUID;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.AMQChannel;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.state.AMQState;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class ServerMethodDispatcherImpl implements MethodDispatcher
{
- private final AMQProtocolSession<?> _connection;
-
- private static interface DispatcherFactory
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection);
- }
-
- private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
- new HashMap<ProtocolVersion, DispatcherFactory>();
-
-
- static
- {
- _dispatcherFactories.put(ProtocolVersion.v8_0,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_8_0(connection);
- }
- });
-
- _dispatcherFactories.put(ProtocolVersion.v0_9,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_0_9(connection);
- }
- });
- _dispatcherFactories.put(ProtocolVersion.v0_91,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_0_91(connection);
- }
- });
-
- }
-
-
- private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance();
- private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance();
- private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance();
- private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance();
- private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
- private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance();
- private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance();
- private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance();
- private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance();
- private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance();
- private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance();
- private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance();
- private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance();
- private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance();
- private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance();
- private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance();
- private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance();
- private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance();
- private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance();
- private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance();
- private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance();
- private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance();
- private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance();
- private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance();
- private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance();
- private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance();
- private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance();
- private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance();
- private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance();
+ private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class);
+ private final AMQProtocolSession<?> _connection;
public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return _dispatcherFactories.get(connection.getProtocolVersion()).createMethodDispatcher(connection);
+ return new ServerMethodDispatcherImpl(connection);
}
@@ -122,61 +101,618 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
{
- _accessRequestHandler.methodReceived(getConnection(), body, channelId);
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ if(ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()) )
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
+ }
+
+ // We don't implement access control class, but to keep clients happy that expect it
+ // always use the "0" ticket.
+ AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+ channel.sync();
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
{
- _basicAckMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId);
+ }
+
+ final AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ // this method throws an AMQException if the delivery tag is not known
+ channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple());
return true;
}
public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
{
- _basicCancelMethodHandler.methodReceived(getConnection(), body, channelId);
+ final AMQChannel channel = _connection.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("BasicCancel: for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait());
+ }
+
+ channel.unsubscribeConsumer(body.getConsumerTag());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
+ channel.sync();
+ _connection.writeFrame(cancelOkBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
{
- _basicConsumeMethodHandler.methodReceived(getConnection(), body, channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+ VirtualHostImpl<?,?,?> vHost = _connection.getVirtualHost();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ else
+ {
+ channel.sync();
+ String queueName = body.getQueue() == null ? null : body.getQueue().asString();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("BasicConsume: from '" + queueName +
+ "' for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait() +
+ " args:" + body.getArguments());
+ }
+
+ MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && body.getArguments() != null
+ && body.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>) body.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = body.getArguments().get("x-multiqueue").toString();
+ }
+
+ if (sources.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No queue for '" + queueName + "'");
+ }
+ if (queueName != null)
+ {
+ String msg = "No such queue, '" + queueName + "'";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry());
+ }
+ else
+ {
+ String msg = "No queue name provided, no default queue defined.";
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, _connection.getMethodRegistry());
+ }
+ }
+ else
+ {
+ final AMQShortString consumerTagName;
+
+ if (body.getConsumerTag() != null)
+ {
+ consumerTagName = body.getConsumerTag().intern(false);
+ }
+ else
+ {
+ consumerTagName = null;
+ }
+
+ try
+ {
+ if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
+ {
+
+ AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+ sources,
+ !body.getNoAck(),
+ body.getArguments(),
+ body.getExclusive(),
+ body.getNoLocal());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
+ else
+ {
+ AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg, // replytext
+ body.getClazz(),
+ body.getMethod());
+ _connection.writeFrame(responseBody.generateFrame(0));
+ }
+
+ }
+ catch (AMQInvalidArgumentException ise)
+ {
+ _logger.debug("Closing connection due to invalid selector");
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
+ AMQShortString.validValueOf(ise.getMessage()),
+ body.getClazz(),
+ body.getMethod());
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+ catch (AMQQueue.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an existing exclusive consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it already has a consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " permission denied", _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an incompatible exclusivity policy",
+ _connection.getMethodRegistry());
+ }
+
+ }
+ }
return true;
}
public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
{
- _basicGetMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ else
+ {
+ channel.sync();
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString());
+ if (queue == null)
+ {
+ _logger.info("No queue for '" + body.getQueue() + "'");
+ if(body.getQueue()!=null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "No such queue, '" + body.getQueue() + "'",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.",
+ _connection.getMethodRegistry());
+ }
+ }
+ else
+ {
+
+ try
+ {
+ if (!performGet(queue, _connection, channel, !body.getNoAck()))
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(), _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an exclusive consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an exclusive consumer, " +
+ "this is likely due to a programming error in the Qpid broker",
+ _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an incompatible exclusivit policy",
+ _connection.getMethodRegistry());
+ }
+ }
+ }
return true;
}
+ public static boolean performGet(final AMQQueue queue,
+ final AMQProtocolSession session,
+ final AMQChannel channel,
+ final boolean acks)
+ throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
+ {
+
+ final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
+ final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
+ {
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
+ }
+ };
+
+ ConsumerTarget_0_8 target;
+ EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES);
+ if(acks)
+ {
+
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+
+ ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ sub.flush();
+ sub.close();
+ return(getDeliveryMethod.hasDeliveredMessage());
+
+
+ }
+
+
+ private static class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQProtocolSession _session;
+ private final AMQChannel _channel;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQProtocolSession session,
+ final AMQChannel channel, final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _session = session;
+ _channel = channel;
+ _queue = queue;
+ }
+
+ @Override
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag)
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ long size =_session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ _channel.getChannelId(),
+ deliveryTag,
+ _queue.getQueueDepthMessages());
+
+ _deliveredMessage = true;
+ return size;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
+
public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
{
- _basicPublishMethodHandler.methodReceived(getConnection(), body, channelId);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Publish received on channel " + channelId);
+ }
+
+ AMQShortString exchangeName = body.getExchange();
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
+
+ MessageDestination destination;
+
+ if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName))
+ {
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
+ }
+
+ // if the exchange does not exist we raise a channel exception
+ if (destination == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ // The partially populated BasicDeliver frame plus the received route body
+ // is stored in the channel. Once the final body frame has been received
+ // it is routed to the exchange.
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ MessagePublishInfo info = new MessagePublishInfo(body.getExchange(),
+ body.getImmediate(),
+ body.getMandatory(),
+ body.getRoutingKey());
+ info.setExchange(exchangeName);
+ try
+ {
+ channel.setPublishFrame(info, destination);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ }
return true;
}
public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
{
- _basicQosHandler.methodReceived(getConnection(), body, channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
+
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
return true;
}
public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
{
- _basicRecoverMethodHandler.methodReceived(getConnection(), body, channelId);
+ _logger.debug("Recover received on protocol session " + _connection
+ + " and channel " + channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ channel.resend();
+
+ // Qpid 0-8 hacks a synchronous -ok onto recover.
+ // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
+ if(_connection.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ channel.sync();
+ _connection.writeFrame(recoverOk.generateFrame(channelId));
+
+ }
+
return true;
}
public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
{
- _basicRejectMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting:" + body.getDeliveryTag() +
+ ": Requeue:" + body.getRequeue() +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ long deliveryTag = body.getDeliveryTag();
+
+ MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+ }
+ else
+ {
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message has already been purged, unable to Reject.");
+ }
+ else
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
+ ": Requeue:" + body.getRequeue() +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ if (body.getRequeue())
+ {
+ //this requeue represents a message rejected from the pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
+
+ channel.requeue(deliveryTag);
+ }
+ else
+ {
+ // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: "
+ + maxDeliveryCountEnabled
+ + " deliveryTag "
+ + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: "
+ + deliveredTooManyTimes
+ + " deliveryTag "
+ + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.requeue(deliveryTag);
+ }
+ }
+ }
+ }
return true;
}
public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
{
- _channelOpenHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+ _logger.info("Connecting to: " + virtualHost.getName());
+
+ final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore());
+
+ _connection.addChannel(channel);
+
+ ChannelOpenOkBody response;
+
+
+ response = _connection.getMethodRegistry().createChannelOpenOkBody();
+
+
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
@@ -186,20 +722,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body);
}
- @Override
- public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody body,
- final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
@@ -238,21 +760,64 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
{
- _channelCloseHandler.methodReceived(getConnection(), body, channelId);
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received channel close for id " + channelId
+ + " citing class " + body.getClassId() +
+ " and method " + body.getMethodId());
+ }
+
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR,
+ "Trying to close unknown channel",
+ _connection.getMethodRegistry());
+ }
+ channel.sync();
+ _connection.closeChannel(channelId);
+ // Client requested closure so we don't wait for ok we send it
+ _connection.closeChannelOk(channelId);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
return true;
}
public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
{
- _channelCloseOkHandler.methodReceived(getConnection(), body, channelId);
+
+ _logger.info("Received channel-close-ok for channel-id " + channelId);
+
+ // Let the Protocol Session know the channel is now closed.
+ _connection.closeChannelOk(channelId);
return true;
}
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- _channelFlowHandler.methodReceived(getConnection(), body, channelId);
+ final AMQProtocolSession<?> connection = getConnection();
+
+
+ AMQChannel channel = connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
+ }
+ channel.sync();
+ channel.setSuspended(!body.getActive());
+ _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
+
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
+ connection.writeFrame(responseBody.generateFrame(channelId));
return true;
}
@@ -269,23 +834,103 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
{
- _connectionOpenMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ //ignore leading '/'
+ String virtualHostName;
+ if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
+ {
+ virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
+ }
+ else
+ {
+ virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
+ }
+
+ VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName);
+
+ if (virtualHost == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ // Check virtualhost access
+ if (virtualHost.getState() != State.ACTIVE)
+ {
+ throw body.getConnectionException(AMQConstant.CONNECTION_FORCED,
+ "Virtual host '" + virtualHost.getName() + "' is not active",
+ _connection.getMethodRegistry());
+ }
+
+ _connection.setVirtualHost(virtualHost);
+ try
+ {
+ virtualHost.getSecurityManager().authoriseCreateConnection(_connection);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
+ if (_connection.getContextKey() == null)
+ {
+ _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+
+ _connection.changeState(AMQState.CONNECTION_OPEN);
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
{
- _connectionCloseMethodHandler.methodReceived(getConnection(), body, channelId);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
+ body.getReplyText() + " for " + _connection);
+ }
+ try
+ {
+ _connection.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ _connection.closeProtocolSession();
+
return true;
}
public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
{
- _connectionCloseOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ _logger.info("Received Connection-close-ok");
+
+ try
+ {
+ _connection.changeState(AMQState.CONNECTION_CLOSED);
+ _connection.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
return true;
}
@@ -368,92 +1013,1246 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
{
- _connectionSecureOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ Broker<?> broker = _connection.getBroker();
+
+ SubjectCreator subjectCreator = _connection.getSubjectCreator();
+
+ SaslServer ss = _connection.getSaslServer();
+ if (ss == null)
+ {
+ throw new AMQException("No SASL context set up in session");
+ }
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ // This should be abstracted
+ _connection.changeState(AMQState.CONNECTION_CLOSING);
+
+ ConnectionCloseBody connectionCloseBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ _connection.writeFrame(connectionCloseBody.generateFrame(0));
+ disposeSaslServer(_connection);
+ break;
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+ _connection.changeState(AMQState.CONNECTION_NOT_TUNED);
+
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if(frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody tuneBody =
+ methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ _connection.writeFrame(tuneBody.generateFrame(0));
+ _connection.setAuthorizedSubject(authResult.getSubject());
+ disposeSaslServer(_connection);
+ break;
+ case CONTINUE:
+ _connection.changeState(AMQState.CONNECTION_NOT_AUTH);
+
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ _connection.writeFrame(secureBody.generateFrame(0));
+ }
return true;
}
+ private void disposeSaslServer(AMQProtocolSession ps)
+ {
+ SaslServer ss = ps.getSaslServer();
+ if (ss != null)
+ {
+ ps.setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+
public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
{
- _connectionStartOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ Broker<?> broker = _connection.getBroker();
+
+ _logger.info("SASL Mechanism selected: " + body.getMechanism());
+ _logger.info("Locale selected: " + body.getLocale());
+
+ SubjectCreator subjectCreator = _connection.getSubjectCreator();
+ SaslServer ss = null;
+ try
+ {
+ ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
+ _connection.getLocalFQDN(),
+ _connection.getPeerPrincipal());
+
+ if (ss == null)
+ {
+ throw body.getConnectionException(AMQConstant.RESOURCE_ERROR,
+ "Unable to create SASL Server:" + body.getMechanism(),
+ _connection.getMethodRegistry());
+ }
+
+ _connection.setSaslServer(ss);
+
+ final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
+ //save clientProperties
+ _connection.setClientProperties(body.getClientProperties());
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ _connection.changeState(AMQState.CONNECTION_CLOSING);
+
+ ConnectionCloseBody closeBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ _connection.writeFrame(closeBody.generateFrame(0));
+ disposeSaslServer(_connection);
+ break;
+
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+ _connection.setAuthorizedSubject(authResult.getSubject());
+
+ _connection.changeState(AMQState.CONNECTION_NOT_TUNED);
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if(frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody
+ tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ _connection.writeFrame(tuneBody.generateFrame(0));
+ break;
+ case CONTINUE:
+ _connection.changeState(AMQState.CONNECTION_NOT_AUTH);
+
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ _connection.writeFrame(secureBody.generateFrame(0));
+ }
+ }
+ catch (SaslException e)
+ {
+ disposeSaslServer(_connection);
+ throw new AMQException("SASL error: " + e, e);
+ }
return true;
}
public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
{
- _connectionTuneOkMethodHandler.methodReceived(getConnection(), body, channelId);
+ final AMQProtocolSession<?> connection = getConnection();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(body);
+ }
+ connection.changeState(AMQState.CONNECTION_NOT_OPENED);
+
+ connection.initHeartbeats(body.getHeartbeat());
+
+ int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+ if(brokerFrameMax <= 0)
+ {
+ brokerFrameMax = Integer.MAX_VALUE;
+ }
+
+ if(body.getFrameMax() > (long) brokerFrameMax)
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + " greater than the broker will allow: "
+ + brokerFrameMax,
+ body.getClazz(), body.getMethod(),
+ connection.getMethodRegistry(),null);
+ }
+ else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + " which is smaller than the specification definined minimum: "
+ + AMQConstant.FRAME_MIN_SIZE.getCode(),
+ body.getClazz(), body.getMethod(),
+ connection.getMethodRegistry(),null);
+ }
+ int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
+ connection.setMaxFrameSize(frameMax);
+
+ long maxChannelNumber = body.getChannelMax();
+ //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+ connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
return true;
}
+ public static final int OK = 0;
+ public static final int EXCHANGE_NOT_FOUND = 1;
+ public static final int QUEUE_NOT_FOUND = 2;
+ public static final int NO_BINDINGS = 3;
+ public static final int QUEUE_NOT_BOUND = 4;
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+
public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
{
- _exchangeBoundHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+
+
+ AMQShortString exchangeName = body.getExchange();
+ AMQShortString queueName = body.getQueue();
+ AMQShortString routingKey = body.getRoutingKey();
+ ExchangeBoundOkBody response;
+
+ if(isDefaultExchange(exchangeName))
+ {
+ if(routingKey == null)
+ {
+ if(queueName == null)
+ {
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null);
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ }
+ }
+ else
+ {
+ if(queueName == null)
+ {
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null);
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null);
+ }
+ }
+ }
+ }
+ else
+ {
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+
+
+ response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+ AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found"));
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
+ null); // replyText
+ }
+ }
+ else
+ {
+
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
+ if (exchange.isBound(bindingKey, queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ String message = "Queue '" + queueName + "' not bound with routing key '" +
+ body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+
+ response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ AMQShortString.validValueOf(message)); // replyText
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
+ AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() +
+ "' to exchange '" + exchangeName + "'")); // replyText
+ }
+ }
+ }
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
{
- _exchangeDeclareHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ final AMQShortString exchangeName = body.getExchange();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
+ }
+
+ ExchangeImpl exchange;
+
+ if(isDefaultExchange(exchangeName))
+ {
+ if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType()))
+ {
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ + " of type "
+ + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ _connection.getMethodRegistry(),null);
+ }
+ }
+ else
+ {
+ if (body.getPassive())
+ {
+ exchange = virtualHost.getExchange(exchangeName.toString());
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName,
+ _connection.getMethodRegistry());
+ }
+ else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString()))
+ {
+
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
+ exchangeName + " of type " + exchange.getType()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ _connection.getMethodRegistry(),null);
+ }
+
+ }
+ else
+ {
+ try
+ {
+ String name = exchangeName == null ? null : exchangeName.intern().toString();
+ String type = body.getType() == null ? null : body.getType().intern().toString();
+
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ if(body.getArguments() != null)
+ {
+ attributes.putAll(FieldTable.convertToMap(body.getArguments()));
+ }
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
+ {
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ }
+ exchange = virtualHost.createExchange(attributes);
+
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.",
+ _connection.getMethodRegistry());
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if(!new AMQShortString(exchange.getType()).equals(body.getType()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getType()
+ + " to " + body.getType() + ".",
+ _connection.getMethodRegistry());
+ }
+ }
+ catch(NoFactoryForTypeException e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID,
+ "Unknown exchange type '"
+ + e.getType()
+ + "' for exchange '"
+ + exchangeName
+ + "'",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ catch (UnknownConfiguredObjectException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "Unknown alternate exchange "
+ + (e.getName() != null
+ ? "name: \"" + e.getName() + "\""
+ : "id: " + e.getId()),
+ _connection.getMethodRegistry());
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID,
+ "Error creating exchange '"
+ + exchangeName
+ + "': "
+ + e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ }
+ }
+
+ if(!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ channel.sync();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
{
- _exchangeDeleteHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ try
+ {
+
+ if(isDefaultExchange(body.getExchange()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Default Exchange cannot be deleted",
+ _connection.getMethodRegistry());
+ }
+
+ final String exchangeName = body.getExchange().toString();
+
+ final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(),
+ _connection.getMethodRegistry());
+ }
+
+ virtualHost.removeExchange(exchange, !body.getIfUnused());
+
+ ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody();
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+
+ catch (ExchangeIsAlternateException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange",
+ _connection.getMethodRegistry());
+
+ }
+ catch (RequiredExchangeException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED,
+ "Exchange '" + body.getExchange() + "' cannot be deleted",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
return true;
}
+ private boolean isDefaultExchange(final AMQShortString exchangeName)
+ {
+ return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
+ }
+
public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
{
- _queueBindHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ final AMQShortString queueName = body.getQueue();
+
+ if (queueName == null)
+ {
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND,
+ "No default queue defined on channel and queue was null",
+ _connection.getMethodRegistry());
+ }
+
+ if (body.getRoutingKey() == null)
+ {
+ routingKey = AMQShortString.valueOf(queue.getName());
+ }
+ else
+ {
+ routingKey = body.getRoutingKey().intern();
+ }
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.",
+ _connection.getMethodRegistry());
+ }
+
+ if(isDefaultExchange(body.getExchange()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Cannot bind the queue " + queueName + " to the default exchange",
+ _connection.getMethodRegistry());
+ }
+
+ final String exchangeName = body.getExchange().toString();
+
+ final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.",
+ _connection.getMethodRegistry());
+ }
+
+
+ try
+ {
+
+ Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
+ String bindingKey = String.valueOf(routingKey);
+
+ if (!exch.isBound(bindingKey, arguments, queue))
+ {
+
+ if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType()))
+ {
+ exch.replaceBinding(bindingKey, queue, arguments);
+ }
+ }
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+ if (!body.getNowait())
+ {
+ channel.sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
return true;
}
public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
{
- _queueDeclareHandler.methodReceived(getConnection(), body, channelId);
+ final AMQSessionModel session = _connection.getChannel(channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ final AMQShortString queueName;
+
+ // if we aren't given a queue name, we create one which we return to the client
+ if ((body.getQueue() == null) || (body.getQueue().length() == 0))
+ {
+ queueName = new AMQShortString("tmp_" + UUID.randomUUID());
+ }
+ else
+ {
+ queueName = body.getQueue().intern();
+ }
+
+ AMQQueue queue;
+
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if(body.getPassive())
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry());
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+ }
+ else
+ {
+
+ try
+ {
+
+ queue = createQueue(channel, queueName, body, virtualHost, _connection);
+
+ }
+ catch(QueueExistsException qe)
+ {
+
+ queue = qe.getExistingQueue();
+
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+ else if(queue.isExclusive() != body.getExclusive())
+ {
+
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different exclusivity (was: "
+ + queue.isExclusive()
+ + " requested "
+ + body.getExclusive()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+ else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy()
+ + " requested autodelete: "
+ + body.getAutoDelete()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+ else if(queue.isDurable() != body.getDurable())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different durability (was: "
+ + queue.isDurable()
+ + " requested "
+ + body.getDurable()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+
+ if (!body.getNowait())
+ {
+ channel.sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
return true;
}
+ protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
+ QueueDeclareBody body,
+ final VirtualHostImpl virtualHost,
+ final AMQProtocolSession session)
+ throws AMQException, QueueExistsException
+ {
+
+ final boolean durable = body.getDurable();
+ final boolean autoDelete = body.getAutoDelete();
+ final boolean exclusive = body.getExclusive();
+
+
+ Map<String, Object> attributes =
+ QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
+ final String queueNameString = AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.DURABLE, durable);
+
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
+
+ if(exclusive)
+ {
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+ }
+ else
+ {
+ lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
+ }
+
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+ final AMQQueue queue = virtualHost.createQueue(attributes);
+
+ return queue;
+ }
+
public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
{
- _queueDeleteHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ AMQQueue queue;
+ if (body.getQueue() == null)
+ {
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+ }
+ else
+ {
+ queue = virtualHost.getQueue(body.getQueue().toString());
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ _connection.getMethodRegistry());
+
+ }
+ else
+ {
+ if (body.getIfEmpty() && !queue.isEmpty())
+ {
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.",
+ _connection.getMethodRegistry());
+ }
+ else if (body.getIfUnused() && !queue.isUnused())
+ {
+ // TODO - Error code
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+
+ int purged = 0;
+ try
+ {
+ purged = virtualHost.removeQueue(queue);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
return true;
}
public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
{
- _queuePurgeHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ AMQQueue queue;
+ if(body.getQueue() == null)
+ {
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+
+ if(queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "No queue specified.",
+ _connection.getMethodRegistry());
+ }
+ }
+ else
+ {
+ queue = virtualHost.getQueue(body.getQueue().toString());
+ }
+
+ if(queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+
+ long purged = 0;
+ try
+ {
+ purged = queue.clearQueue();
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+
+ if(!body.getNowait())
+ {
+ channel.sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
return true;
}
- public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+ public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException
{
- _txCommitHandler.methodReceived(getConnection(), body, channelId);
+ try
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Commit received on channel " + channelId);
+ }
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.commit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }, true);
+
+
+
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(),
+ _connection.getMethodRegistry());
+ }
return true;
}
- public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+ public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException
{
- _txRollbackHandler.methodReceived(getConnection(), body, channelId);
+ try
+ {
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+
+
+ final MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
+ Runnable task = new Runnable()
+ {
+
+ public void run()
+ {
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ };
+
+ channel.rollback(task);
+
+ //Now resend all the unacknowledged messages back to the original subscribers.
+ //(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
+ channel.resend();
+
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage(),
+ _connection.getMethodRegistry());
+ }
return true;
}
public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
{
- _txSelectHandler.methodReceived(getConnection(), body, channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ channel.setLocalTransactional();
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ return true;
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ final AMQProtocolSession<?> connection = getConnection();
+
+ _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+ AMQChannel channel = connection.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
+ }
+ channel.sync();
+ channel.resend();
+
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ connection.writeFrame(recoverOk.generateFrame(channelId));
+
return true;
}
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
@Override
- public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException
+ public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
+ throws AMQException
{
- return false;
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
}
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ final AMQProtocolSession<?> connection = getConnection();
+
+ if (ProtocolVersion.v8_0.equals(connection.getProtocolVersion()))
+ {
+ // 0-8 does not support QueueUnbind
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null);
+ }
+
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+
+ AMQChannel channel = connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
+ }
+
+ if (body.getQueue() == null)
+ {
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND,
+ "No default queue defined on channel and queue was null",
+ connection.getMethodRegistry());
+ }
+
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false);
+
+ }
+ else
+ {
+ queue = virtualHost.getQueue(body.getQueue().toString());
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false);
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ connection.getMethodRegistry());
+ }
+
+ if(isDefaultExchange(body.getExchange()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Cannot unbind the queue "
+ + queue.getName()
+ + " from the default exchange", connection.getMethodRegistry());
+ }
+
+ final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.",
+ connection.getMethodRegistry());
+ }
+
+ if(!exch.hasBinding(String.valueOf(routingKey), queue))
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such binding", connection.getMethodRegistry());
+ }
+ else
+ {
+ try
+ {
+ exch.deleteBinding(String.valueOf(routingKey), queue);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ connection.getMethodRegistry());
+ }
+ }
+
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+
+
+ final AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
+ channel.sync();
+ connection.writeFrame(responseBody.generateFrame(channelId));
+ return true;
+ }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java deleted file mode 100644 index d15d27fcd5..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java +++ /dev/null @@ -1,80 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-
-
-public class ServerMethodDispatcherImpl_0_9
- extends ServerMethodDispatcherImpl
- implements MethodDispatcher
-
-{
-
- private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
- BasicRecoverSyncMethodHandler.getInstance();
- private static final QueueUnbindHandler _queueUnbindHandler =
- QueueUnbindHandler.getInstance();
-
-
- public ServerMethodDispatcherImpl_0_9(AMQProtocolSession<?> connection)
- {
- super(connection);
- }
-
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId);
- return true;
- }
-
- public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
- {
- _queueUnbindHandler.methodReceived(getConnection(), body,channelId);
- return true;
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java deleted file mode 100644 index f0f387685a..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java +++ /dev/null @@ -1,79 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-
-public class ServerMethodDispatcherImpl_0_91
- extends ServerMethodDispatcherImpl
- implements MethodDispatcher
-
-{
-
- private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
- BasicRecoverSyncMethodHandler.getInstance();
- private static final QueueUnbindHandler _queueUnbindHandler =
- QueueUnbindHandler.getInstance();
-
-
- public ServerMethodDispatcherImpl_0_91(AMQProtocolSession<?> connection)
- {
- super(connection);
- }
-
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId);
- return true;
- }
-
- public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
- {
- _queueUnbindHandler.methodReceived(getConnection(), body,channelId);
- return true;
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java deleted file mode 100644 index 4b8531832b..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java +++ /dev/null @@ -1,71 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-public class ServerMethodDispatcherImpl_8_0
- extends ServerMethodDispatcherImpl
- implements MethodDispatcher
-{
- public ServerMethodDispatcherImpl_8_0(AMQProtocolSession<?> connection)
- {
- super(connection);
- }
-
- public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId)
- {
- return false;
- }
-
- @Override
- public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody body,
- final int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException
- {
- return false;
- }
-
- @Override
- public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java deleted file mode 100644 index cb08b1fd4f..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.TxCommitBody; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> -{ - private static final Logger _log = Logger.getLogger(TxCommitHandler.class); - - private static TxCommitHandler _instance = new TxCommitHandler(); - - public static TxCommitHandler getInstance() - { - return _instance; - } - - private TxCommitHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - TxCommitBody body, - final int channelId) throws AMQException - { - try - { - if (_log.isDebugEnabled()) - { - _log.debug("Commit received on channel " + channelId); - } - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.commit(new Runnable() - { - - @Override - public void run() - { - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); - connection.writeFrame(responseBody.generateFrame(channelId)); - } - }, true); - - - - } - catch (AMQException e) - { - throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(), - connection.getMethodRegistry()); - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java deleted file mode 100644 index 08c1c2378b..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.TxRollbackBody; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody> -{ - private static TxRollbackHandler _instance = new TxRollbackHandler(); - - public static TxRollbackHandler getInstance() - { - return _instance; - } - - private TxRollbackHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - TxRollbackBody body, - final int channelId) throws AMQException - { - try - { - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - - - final MethodRegistry methodRegistry = connection.getMethodRegistry(); - final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody(); - - Runnable task = new Runnable() - { - - public void run() - { - connection.writeFrame(responseBody.generateFrame(channelId)); - } - }; - - channel.rollback(task); - - //Now resend all the unacknowledged messages back to the original subscribers. - //(Must be done after the TxnRollback-ok response). - // Why, are we not allowed to send messages back to client before the ok method? - channel.resend(); - - } - catch (AMQException e) - { - throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage(), - connection.getMethodRegistry()); - } - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java deleted file mode 100644 index d6ac194b09..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.TxSelectOkBody; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> -{ - private static TxSelectHandler _instance = new TxSelectHandler(); - - public static TxSelectHandler getInstance() - { - return _instance; - } - - private TxSelectHandler() - { - } - - public void methodReceived(final AMQProtocolSession<?> connection, - TxSelectBody body, - int channelId) throws AMQException - { - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - - channel.setLocalTransactional(); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody(); - connection.writeFrame(responseBody.generateFrame(channelId)); - } -} |