summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-12 11:16:06 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-12 11:16:06 +0000
commitce16072eb7da6d02e7224380459a27a19770a5d5 (patch)
tree85043404b1214da737c7bea5701541ab2f0b282e
parenta6dadb00efda0b8a33e4cec9b51be475b9e1e078 (diff)
downloadqpid-python-ce16072eb7da6d02e7224380459a27a19770a5d5.tar.gz
Move channel methods into AMQChannel
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1631160 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java1417
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java1484
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java2
4 files changed, 1406 insertions, 1499 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b4f276a45a..4b0fc6fd02 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -47,14 +47,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
@@ -62,6 +56,7 @@ import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -69,6 +64,7 @@ import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -85,12 +81,18 @@ import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -102,12 +104,18 @@ import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.TransportException;
public class AMQChannel
implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
- AsyncAutoCommitTransaction.FutureRecorder
+ AsyncAutoCommitTransaction.FutureRecorder,
+ ChannelMethodProcessor
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -191,7 +199,6 @@ public class AMQChannel
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
- private ChannelMethodProcessor _channelMethodProcessor;
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
@@ -239,7 +246,52 @@ public class AMQChannel
return null;
}
});
- _channelMethodProcessor = new ChannelMethodProcessorImpl(this);
+
+ }
+
+ private boolean performGet(final AMQQueue queue,
+ final boolean acks)
+ throws MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
+ {
+
+ final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, queue);
+ final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final ConsumerImpl sub,
+ final MessageInstance entry,
+ final long deliveryTag)
+ {
+ addUnacknowledgedMessage(entry, deliveryTag, null);
+ }
+ };
+
+ ConsumerTarget_0_8 target;
+ EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES);
+ if (acks)
+ {
+
+ target = ConsumerTarget_0_8.createAckTarget(this,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createGetNoAckTarget(this,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+
+ ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ sub.flush();
+ sub.close();
+ return getDeliveryMethod.hasDeliveredMessage();
+
}
@@ -1289,9 +1341,39 @@ public class AMQChannel
return _subject;
}
- public ChannelMethodProcessor getMethodProcessor()
+ private class GetDeliveryMethod implements ClientDeliveryMethod
{
- return _channelMethodProcessor;
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _queue = queue;
+ }
+
+ @Override
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag)
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ long size = _connection.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ AMQChannel.this.getChannelId(),
+ deliveryTag,
+ _queue.getQueueDepthMessages());
+
+ _deliveredMessage = true;
+ return size;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
}
@@ -1786,4 +1868,1313 @@ public class AMQChannel
return 0L;
}
}
+
+ @Override
+ public void receiveAccessRequest(final AMQShortString realm,
+ final boolean exclusive,
+ final boolean passive,
+ final boolean active, final boolean write, final boolean read)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()))
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "AccessRequest not present in AMQP versions other than 0-8, 0-9",
+ _channelId);
+ }
+ else
+ {
+ // We don't implement access control class, but to keep clients happy that expect it
+ // always use the "0" ticket.
+ AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+ sync();
+ _connection.writeFrame(response.generateFrame(_channelId));
+ }
+ }
+
+ @Override
+ public void receiveBasicAck(final long deliveryTag, final boolean multiple)
+ {
+ acknowledgeMessage(deliveryTag, multiple);
+ }
+
+ @Override
+ public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait)
+ {
+ unsubscribeConsumer(consumerTag);
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+ sync();
+ _connection.writeFrame(cancelOkBody.generateFrame(_channelId));
+ }
+ }
+
+ @Override
+ public void receiveBasicConsume(final AMQShortString queue,
+ final AMQShortString consumerTag,
+ final boolean noLocal,
+ final boolean noAck,
+ final boolean exclusive, final boolean nowait, final FieldTable arguments)
+ {
+ AMQShortString consumerTag1 = consumerTag;
+ VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost();
+ sync();
+ String queueName = queue == null ? null : queue.asString();
+
+ MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if (queue1 != null)
+ {
+ sources.add(queue1);
+ }
+ else if (vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && arguments != null
+ && arguments.get("x-multiqueue") instanceof Collection)
+ {
+ for (Object object : (Collection<Object>) arguments.get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if (sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if (source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = arguments.get("x-multiqueue").toString();
+ }
+
+ if (sources.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No queue for '" + queueName + "'");
+ }
+ if (queueName != null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'");
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.", _channelId);
+ }
+ }
+ else
+ {
+ try
+ {
+ consumerTag1 = consumeFromSource(consumerTag1,
+ sources,
+ !noAck,
+ arguments,
+ exclusive,
+ noLocal);
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1);
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+
+ }
+ }
+ catch (ConsumerTagInUseException cte)
+ {
+
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Non-unique consumer tag, '" + consumerTag1
+ + "'", _channelId);
+ }
+ catch (AMQInvalidArgumentException ise)
+ {
+ _connection.closeConnection(AMQConstant.ARGUMENT_INVALID, ise.getMessage(), _channelId);
+
+
+ }
+ catch (AMQQueue.ExistingExclusiveConsumer e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue1.getName()
+ + " as it already has an existing exclusive consumer", _channelId);
+
+ }
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue1.getName()
+ + " exclusively as it already has a consumer", _channelId);
+
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue "
+ + queue1.getName()
+ + " permission denied", _channelId);
+
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue1.getName()
+ + " as it already has an incompatible exclusivity policy", _channelId);
+
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveBasicGet(final AMQShortString queueName, final boolean noAck)
+ {
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+ sync();
+ AMQQueue queue = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ _logger.info("No queue for '" + queueName + "'");
+ if (queueName != null)
+ {
+ _connection.closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId);
+
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.", _channelId);
+
+ }
+ }
+ else
+ {
+
+ try
+ {
+ if (!performGet(queue, !noAck))
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+ }
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), _channelId);
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer", _channelId);
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ _connection.closeConnection(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an exclusive consumer, " +
+ "this is likely due to a programming error in the Qpid broker", _channelId);
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Queue has an incompatible exclusivity policy", _channelId);
+ }
+ }
+ }
+
+ @Override
+ public void receiveBasicPublish(final AMQShortString exchangeName,
+ final AMQShortString routingKey,
+ final boolean mandatory,
+ final boolean immediate)
+ {
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ MessageDestination destination;
+
+ if (isDefaultExchange(exchangeName))
+ {
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
+ }
+
+ // if the exchange does not exist we raise a channel exception
+ if (destination == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
+ }
+ else
+ {
+
+ MessagePublishInfo info = new MessagePublishInfo(exchangeName,
+ immediate,
+ mandatory,
+ routingKey);
+
+ try
+ {
+ setPublishFrame(info, destination);
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+ }
+ }
+
+ @Override
+ public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
+ {
+ sync();
+ setCredit(prefetchSize, prefetchCount);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveBasicRecover(final boolean requeue, final boolean sync)
+ {
+ resend();
+
+ if (sync)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ sync();
+ _connection.writeFrame(recoverOk.generateFrame(getChannelId()));
+
+ }
+
+ }
+
+ @Override
+ public void receiveBasicReject(final long deliveryTag, final boolean requeue)
+ {
+ MessageInstance message = getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+ }
+ else
+ {
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message has already been purged, unable to Reject.");
+ }
+ else
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting: DT:" + deliveryTag
+ + "-" + message.getMessage() +
+ ": Requeue:" + requeue
+ +
+ " on channel:" + debugIdentity());
+ }
+
+ if (requeue)
+ {
+ //this requeue represents a message rejected from the pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
+
+ requeue(deliveryTag);
+ }
+ else
+ {
+ // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: "
+ + maxDeliveryCountEnabled
+ + " deliveryTag "
+ + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: "
+ + deliveredTooManyTimes
+ + " deliveryTag "
+ + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ deadLetter(deliveryTag);
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ requeue(deliveryTag);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveChannelClose()
+ {
+ sync();
+ _connection.closeChannel(this);
+
+ _connection.writeFrame(new AMQFrame(getChannelId(),
+ _connection.getMethodRegistry().createChannelCloseOkBody()));
+ }
+
+ @Override
+ public void receiveChannelCloseOk()
+ {
+ _connection.closeChannelOk(getChannelId());
+ }
+
+ @Override
+ public void receiveChannelFlow(final boolean active)
+ {
+ sync();
+ setSuspended(!active);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(active);
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveExchangeBound(final AMQShortString exchangeName,
+ final AMQShortString queueName,
+ final AMQShortString routingKey)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ sync();
+
+ int replyCode;
+ String replyText;
+
+ if (isDefaultExchange(exchangeName))
+ {
+ if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ replyCode = virtualHost.getQueues().isEmpty()
+ ? ExchangeBoundOkBody.NO_BINDINGS
+ : ExchangeBoundOkBody.OK;
+ replyText = null;
+
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ }
+ }
+ else
+ {
+ if (queueName == null)
+ {
+ replyCode = virtualHost.getQueue(routingKey.toString()) == null
+ ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK
+ : ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ replyCode = queueName.equals(routingKey)
+ ? ExchangeBoundOkBody.OK
+ : ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+ replyText = null;
+ }
+ }
+ }
+ }
+ else
+ {
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+
+ replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND;
+ replyText = "Exchange '" + exchangeName + "' not found";
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.NO_BINDINGS;
+ replyText = null;
+ }
+ }
+ else
+ {
+
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND;
+ replyText = "Queue '"
+ + queueName
+ + "' not bound to exchange '"
+ + exchangeName
+ + "'";
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ String bindingKey = routingKey == null ? null : routingKey.asString();
+ if (exchange.isBound(bindingKey, queue))
+ {
+
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+ replyText = "Queue '" + queueName + "' not bound with routing key '" +
+ routingKey + "' to exchange '" + exchangeName + "'";
+
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(routingKey == null ? "" : routingKey.asString()))
+ {
+
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK;
+ replyText =
+ "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'";
+ }
+ }
+ }
+
+ ExchangeBoundOkBody exchangeBoundOkBody =
+ methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText));
+
+ _connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveExchangeDeclare(final AMQShortString exchangeName,
+ final AMQShortString type,
+ final boolean passive,
+ final boolean durable,
+ final boolean autoDelete,
+ final boolean internal,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ ExchangeImpl exchange;
+ VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost();
+ if (isDefaultExchange(exchangeName))
+ {
+ if (!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ + " of type "
+ + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ + " to " + type + ".", getChannelId());
+ }
+ else if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ sync();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+
+ }
+ else
+ {
+ if (passive)
+ {
+ exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ }
+ else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.asString()))
+ {
+
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ +
+ exchangeName
+ + " of type "
+ + exchange.getType()
+ + " to "
+ + type
+ + ".", getChannelId());
+ }
+ else if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ sync();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+
+ }
+ else
+ {
+ try
+ {
+ String name = exchangeName == null ? null : exchangeName.intern().toString();
+ String typeString = type == null ? null : type.intern().toString();
+
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ if (arguments != null)
+ {
+ attributes.putAll(FieldTable.convertToMap(arguments));
+ }
+ attributes.put(Exchange.ID, null);
+ attributes.put(Exchange.NAME, name);
+ attributes.put(Exchange.TYPE, typeString);
+ attributes.put(Exchange.DURABLE, durable);
+ attributes.put(Exchange.LIFETIME_POLICY,
+ autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ if (!attributes.containsKey(Exchange.ALTERNATE_EXCHANGE))
+ {
+ attributes.put(Exchange.ALTERNATE_EXCHANGE, null);
+ }
+ exchange = virtualHost.createExchange(attributes);
+
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ sync();
+ _connection.writeFrame(responseBody.generateFrame(
+ getChannelId()));
+ }
+
+ }
+ catch (ReservedExchangeNameException e)
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.", getChannelId());
+
+
+ }
+ catch (ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if (!new AMQShortString(exchange.getType()).equals(type))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getType()
+ + " to " + type + ".", getChannelId());
+
+ }
+ }
+ catch (NoFactoryForTypeException e)
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"
+ + e.getType()
+ + "' for exchange '"
+ + exchangeName
+ + "'", getChannelId());
+
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+ catch (UnknownConfiguredObjectException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ final String message = "Unknown alternate exchange "
+ + (e.getName() != null
+ ? "name: \"" + e.getName() + "\""
+ : "id: " + e.getId());
+ _connection.closeConnection(AMQConstant.NOT_FOUND, message, getChannelId());
+
+ }
+ catch (IllegalArgumentException e)
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Error creating exchange '"
+ + exchangeName
+ + "': "
+ + e.getMessage(), getChannelId());
+
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ sync();
+ try
+ {
+
+ if (isDefaultExchange(exchangeStr))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Default Exchange cannot be deleted", getChannelId());
+
+ }
+
+ else
+ {
+ final String exchangeName = exchangeStr.toString();
+
+ final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+ if (exchange == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such exchange: " + exchangeStr);
+ }
+ else
+ {
+ virtualHost.removeExchange(exchange, !ifUnused);
+
+ ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody();
+
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ }
+ }
+ catch (ExchangeIsAlternateException e)
+ {
+ closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ }
+ catch (RequiredExchangeException e)
+ {
+ closeChannel(AMQConstant.NOT_ALLOWED,
+ "Exchange '" + exchangeStr + "' cannot be deleted");
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ }
+ }
+
+ @Override
+ public void receiveQueueBind(final AMQShortString queueName,
+ final AMQShortString exchange,
+ AMQShortString routingKey,
+ final boolean nowait,
+ final FieldTable argumentsTable)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQQueue<?> queue;
+ if (queueName == null)
+ {
+
+ queue = getDefaultQueue();
+
+ if (queue != null)
+ {
+ if (routingKey == null)
+ {
+ routingKey = AMQShortString.valueOf(queue.getName());
+ }
+ else
+ {
+ routingKey = routingKey.intern();
+ }
+ }
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey.intern();
+ }
+
+ if (queue == null)
+ {
+ String message = queueName == null
+ ? "No default queue defined on channel and queue was null"
+ : "Queue " + queueName + " does not exist.";
+ closeChannel(AMQConstant.NOT_FOUND, message);
+ }
+ else if (isDefaultExchange(exchange))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Cannot bind the queue " + queueName + " to the default exchange", getChannelId());
+
+ }
+ else
+ {
+
+ final String exchangeName = exchange.toString();
+
+ final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+ if (exch == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND,
+ "Exchange " + exchangeName + " does not exist.");
+ }
+ else
+ {
+
+ try
+ {
+
+ Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable);
+ String bindingKey = String.valueOf(routingKey);
+
+ if (!exch.isBound(bindingKey, arguments, queue))
+ {
+
+ if (!exch.addBinding(bindingKey, queue, arguments)
+ && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(
+ exch.getType()))
+ {
+ exch.replaceBinding(bindingKey, queue, arguments);
+ }
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue "
+ + queue
+ + " to exchange "
+ + exch
+ + " with routing key "
+ + routingKey);
+ }
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveQueueDeclare(final AMQShortString queueStr,
+ final boolean passive,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ final AMQShortString queueName;
+
+ // if we aren't given a queue name, we create one which we return to the client
+ if ((queueStr == null) || (queueStr.length() == 0))
+ {
+ queueName = new AMQShortString("tmp_" + UUID.randomUUID());
+ }
+ else
+ {
+ queueName = queueStr.intern();
+ }
+
+ AMQQueue queue;
+
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+
+ if (passive)
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND,
+ "Queue: "
+ + queueName
+ + " not found on VirtualHost("
+ + virtualHost
+ + ").");
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(this))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.", getChannelId());
+ }
+ else
+ {
+ //set this as the default queue on the channel:
+ setDefaultQueue(queue);
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
+ }
+ }
+ }
+ else
+ {
+
+ try
+ {
+ Map<String, Object> attributes =
+ QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments));
+ final String queueNameString = AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.DURABLE, durable);
+
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
+
+ if (exclusive)
+ {
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+ }
+ else
+ {
+ lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
+ }
+
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+ queue = virtualHost.createQueue(attributes);
+
+ setDefaultQueue(queue);
+
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
+ }
+ catch (QueueExistsException qe)
+ {
+
+ queue = qe.getExistingQueue();
+
+ if (!queue.verifySessionAccess(this))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.", getChannelId());
+
+ }
+ else if (queue.isExclusive() != exclusive)
+ {
+
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different exclusivity (was: "
+ + queue.isExclusive()
+ + " requested "
+ + exclusive
+ + ")");
+ }
+ else if ((autoDelete
+ && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!autoDelete && queue.getLifetimePolicy() != ((exclusive
+ && !durable)
+ ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ : LifetimePolicy.PERMANENT)))
+ {
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy()
+ + " requested autodelete: "
+ + autoDelete
+ + ")");
+ }
+ else if (queue.isDurable() != durable)
+ {
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different durability (was: "
+ + queue.isDurable()
+ + " requested "
+ + durable
+ + ")");
+ }
+ else
+ {
+ setDefaultQueue(queue);
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
+ }
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveQueueDelete(final AMQShortString queueName,
+ final boolean ifUnused,
+ final boolean ifEmpty,
+ final boolean nowait)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ sync();
+ AMQQueue queue;
+ if (queueName == null)
+ {
+
+ //get the default queue on the channel:
+ queue = getDefaultQueue();
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ }
+
+ if (queue == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
+
+ }
+ else
+ {
+ if (ifEmpty && !queue.isEmpty())
+ {
+ closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is not empty.");
+ }
+ else if (ifUnused && !queue.isUnused())
+ {
+ // TODO - Error code
+ closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is still used.");
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(this))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.", getChannelId());
+
+ }
+ else
+ {
+ try
+ {
+ int purged = virtualHost.removeQueue(queue);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQQueue queue = null;
+ if (queueName == null && (queue = getDefaultQueue()) == null)
+ {
+
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "No queue specified.", getChannelId());
+ }
+ else if ((queueName != null) && (queue = virtualHost.getQueue(queueName.toString())) == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
+ }
+ else if (!queue.verifySessionAccess(this))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.", getChannelId());
+ }
+ else
+ {
+ try
+ {
+ long purged = queue.clearQueue();
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveQueueUnbind(final AMQShortString queueName,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final FieldTable arguments)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+
+ final boolean useDefaultQueue = queueName == null;
+ final AMQQueue queue = useDefaultQueue
+ ? getDefaultQueue()
+ : virtualHost.getQueue(queueName.toString());
+
+
+ if (queue == null)
+ {
+ String message = useDefaultQueue
+ ? "No default queue defined on channel and queue was null"
+ : "Queue " + queueName + " does not exist.";
+ closeChannel(AMQConstant.NOT_FOUND, message);
+ }
+ else if (isDefaultExchange(exchange))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue "
+ + queue.getName()
+ + " from the default exchange", getChannelId());
+
+ }
+ else
+ {
+
+ final ExchangeImpl exch = virtualHost.getExchange(exchange.toString());
+
+ if (exch == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchange + " does not exist.");
+ }
+ else if (!exch.hasBinding(String.valueOf(routingKey), queue))
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such binding");
+ }
+ else
+ {
+ try
+ {
+ exch.deleteBinding(String.valueOf(routingKey), queue);
+
+ final AMQMethodBody responseBody = _connection.getMethodRegistry().createQueueUnbindOkBody();
+ sync();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveTxSelect()
+ {
+ setLocalTransactional();
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+
+ }
+
+ @Override
+ public void receiveTxCommit()
+ {
+ if (!isTransactional())
+ {
+ closeChannel(AMQConstant.COMMAND_INVALID,
+ "Fatal error: commit called on non-transactional channel");
+ }
+ commit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+ }
+ }, true);
+
+ }
+
+ @Override
+ public void receiveTxRollback()
+ {
+ if (!isTransactional())
+ {
+ closeChannel(AMQConstant.COMMAND_INVALID,
+ "Fatal error: rollback called on non-transactional channel");
+ }
+
+ final MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
+ Runnable task = new Runnable()
+ {
+
+ public void run()
+ {
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+ }
+ };
+
+ rollback(task);
+
+ //Now resend all the unacknowledged messages back to the original subscribers.
+ //(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
+ resend();
+ }
+
+
+ private void closeChannel(final AMQConstant cause, final String message)
+ {
+ _connection.closeChannelAndWriteFrame(this, cause, message);
+ }
+
+
+ private boolean isDefaultExchange(final AMQShortString exchangeName)
+ {
+ return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName);
+ }
+
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java
deleted file mode 100644
index 5e55d24b92..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java
+++ /dev/null
@@ -1,1484 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import java.security.AccessControlException;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AccessRequestOkBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.BasicGetEmptyBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MessagePublishInfo;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxSelectOkBody;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.filter.AMQInvalidArgumentException;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.NoFactoryForTypeException;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UnknownConfiguredObjectException;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.virtualhost.ExchangeExistsException;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
-import org.apache.qpid.server.virtualhost.QueueExistsException;
-import org.apache.qpid.server.virtualhost.RequiredExchangeException;
-import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class ChannelMethodProcessorImpl implements ChannelMethodProcessor
-{
- private static final Logger _logger = Logger.getLogger(ChannelMethodProcessorImpl.class);
-
- private final AMQChannel _channel;
- private final AMQProtocolEngine _connection;
-
- public ChannelMethodProcessorImpl(final AMQChannel channel)
- {
- _channel = channel;
- _connection = _channel.getConnection();
- }
-
- @Override
- public void receiveAccessRequest(final AMQShortString realm,
- final boolean exclusive,
- final boolean passive,
- final boolean active,
- final boolean write,
- final boolean read)
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
- if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()))
- {
- closeConnection(AMQConstant.COMMAND_INVALID,
- "AccessRequest not present in AMQP versions other than 0-8, 0-9");
- }
- else
- {
- // We don't implement access control class, but to keep clients happy that expect it
- // always use the "0" ticket.
- AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
- _channel.sync();
- _connection.writeFrame(response.generateFrame(getChannelId()));
- }
- }
-
- @Override
- public void receiveBasicAck(final long deliveryTag, final boolean multiple)
- {
- _channel.acknowledgeMessage(deliveryTag, multiple);
- }
-
- @Override
- public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait)
- {
- _channel.unsubscribeConsumer(consumerTag);
- if (!nowait)
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
- _channel.sync();
- _connection.writeFrame(cancelOkBody.generateFrame(getChannelId()));
- }
- }
-
- @Override
- public void receiveBasicConsume(final AMQShortString queueNameStr,
- AMQShortString consumerTag,
- final boolean noLocal,
- final boolean noAck,
- final boolean exclusive,
- final boolean nowait,
- final FieldTable arguments)
- {
- VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost();
- _channel.sync();
- String queueName = queueNameStr == null ? null : queueNameStr.asString();
-
- MessageSource queue = queueName == null ? _channel.getDefaultQueue() : vHost.getQueue(queueName);
- final Collection<MessageSource> sources = new HashSet<>();
- if (queue != null)
- {
- sources.add(queue);
- }
- else if (vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
- && arguments != null
- && arguments.get("x-multiqueue") instanceof Collection)
- {
- for (Object object : (Collection<Object>) arguments.get("x-multiqueue"))
- {
- String sourceName = String.valueOf(object);
- sourceName = sourceName.trim();
- if (sourceName.length() != 0)
- {
- MessageSource source = vHost.getMessageSource(sourceName);
- if (source == null)
- {
- sources.clear();
- break;
- }
- else
- {
- sources.add(source);
- }
- }
- }
- queueName = arguments.get("x-multiqueue").toString();
- }
-
- if (sources.isEmpty())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("No queue for '" + queueName + "'");
- }
- if (queueName != null)
- {
- closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'");
- }
- else
- {
- closeConnection(AMQConstant.NOT_ALLOWED, "No queue name provided, no default queue defined.");
- }
- }
- else
- {
- try
- {
- consumerTag = _channel.consumeFromSource(consumerTag,
- sources,
- !noAck,
- arguments,
- exclusive,
- noLocal);
- if (!nowait)
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- }
- }
- catch (ConsumerTagInUseException cte)
- {
-
- closeConnection(AMQConstant.NOT_ALLOWED, "Non-unique consumer tag, '" + consumerTag + "'");
- }
- catch (AMQInvalidArgumentException ise)
- {
- closeConnection(AMQConstant.ARGUMENT_INVALID, ise.getMessage());
-
-
- }
- catch (AMQQueue.ExistingExclusiveConsumer e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue "
- + queue.getName()
- + " as it already has an existing exclusive consumer");
-
- }
- catch (AMQQueue.ExistingConsumerPreventsExclusive e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue "
- + queue.getName()
- + " exclusively as it already has a consumer");
-
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue "
- + queue.getName()
- + " permission denied");
-
- }
- catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue "
- + queue.getName()
- + " as it already has an incompatible exclusivity policy");
-
- }
-
- }
- }
-
- @Override
- public void receiveBasicGet(final AMQShortString queueName, final boolean noAck)
- {
- VirtualHostImpl vHost = _connection.getVirtualHost();
- _channel.sync();
- AMQQueue queue =
- queueName == null ? _channel.getDefaultQueue() : vHost.getQueue(queueName.toString());
- if (queue == null)
- {
- _logger.info("No queue for '" + queueName + "'");
- if (queueName != null)
- {
- closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'");
-
- }
- else
- {
- closeConnection(AMQConstant.NOT_ALLOWED, "No queue name provided, no default queue defined.");
-
- }
- }
- else
- {
-
- try
- {
- if (!performGet(queue, _connection, _channel, !noAck))
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
- BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
-
-
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
- }
- catch (MessageSource.ExistingExclusiveConsumer e)
- {
- closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer");
- }
- catch (MessageSource.ExistingConsumerPreventsExclusive e)
- {
- closeConnection(AMQConstant.INTERNAL_ERROR,
- "The GET request has been evaluated as an exclusive consumer, " +
- "this is likely due to a programming error in the Qpid broker");
- }
- catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
- {
- closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an incompatible exclusivity policy");
- }
- }
- }
-
- @Override
- public void receiveBasicPublish(final AMQShortString exchangeName,
- final AMQShortString routingKey,
- final boolean mandatory,
- final boolean immediate)
- {
- VirtualHostImpl vHost = _connection.getVirtualHost();
-
- MessageDestination destination;
-
- if (isDefaultExchange(exchangeName))
- {
- destination = vHost.getDefaultDestination();
- }
- else
- {
- destination = vHost.getMessageDestination(exchangeName.toString());
- }
-
- // if the exchange does not exist we raise a channel exception
- if (destination == null)
- {
- closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
- }
- else
- {
-
- MessagePublishInfo info = new MessagePublishInfo(exchangeName,
- immediate,
- mandatory,
- routingKey);
-
- try
- {
- _channel.setPublishFrame(info, destination);
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
-
- }
- }
- }
-
- @Override
- public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
- {
- _channel.sync();
- _channel.setCredit(prefetchSize, prefetchCount);
-
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- }
-
- @Override
- public void receiveBasicRecover(final boolean requeue, final boolean sync)
- {
- _channel.resend();
-
- if (sync)
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- _channel.sync();
- _connection.writeFrame(recoverOk.generateFrame(getChannelId()));
-
- }
-
- }
-
- @Override
- public void receiveBasicReject(final long deliveryTag, final boolean requeue)
- {
- MessageInstance message = _channel.getUnacknowledgedMessageMap().get(deliveryTag);
-
- if (message == null)
- {
- _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
- }
- else
- {
-
- if (message.getMessage() == null)
- {
- _logger.warn("Message has already been purged, unable to Reject.");
- }
- else
- {
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
- ": Requeue:" + requeue +
- " on channel:" + _channel.debugIdentity());
- }
-
- if (requeue)
- {
- //this requeue represents a message rejected from the pre-dispatch queue
- //therefore we need to amend the delivery counter.
- message.decrementDeliveryCount();
-
- _channel.requeue(deliveryTag);
- }
- else
- {
- // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
- // as it would prevent redelivery
- // message.reject();
-
- final boolean maxDeliveryCountEnabled = _channel.isMaxDeliveryCountEnabled(deliveryTag);
- _logger.debug("maxDeliveryCountEnabled: "
- + maxDeliveryCountEnabled
- + " deliveryTag "
- + deliveryTag);
- if (maxDeliveryCountEnabled)
- {
- final boolean deliveredTooManyTimes = _channel.isDeliveredTooManyTimes(deliveryTag);
- _logger.debug("deliveredTooManyTimes: "
- + deliveredTooManyTimes
- + " deliveryTag "
- + deliveryTag);
- if (deliveredTooManyTimes)
- {
- _channel.deadLetter(deliveryTag);
- }
- else
- {
- //this requeue represents a message rejected because of a recover/rollback that we
- //are not ready to DLQ. We rely on the reject command to resend from the unacked map
- //and therefore need to increment the delivery counter so we cancel out the effect
- //of the AMQChannel#resend() decrement.
- message.incrementDeliveryCount();
- }
- }
- else
- {
- _channel.requeue(deliveryTag);
- }
- }
- }
- }
- }
-
- @Override
- public void receiveChannelClose()
- {
- _channel.sync();
- _connection.closeChannel(_channel);
-
- _connection.writeFrame(new AMQFrame(_channel.getChannelId(),
- _connection.getMethodRegistry().createChannelCloseOkBody()));
- }
-
- @Override
- public void receiveChannelCloseOk()
- {
- _connection.closeChannelOk(getChannelId());
- }
-
- @Override
- public void receiveChannelFlow(final boolean active)
- {
- _channel.sync();
- _channel.setSuspended(!active);
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(active);
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- }
-
- @Override
- public void receiveExchangeBound(final AMQShortString exchangeName,
- final AMQShortString queueName,
- final AMQShortString routingKey)
- {
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
- _channel.sync();
-
- int replyCode;
- String replyText;
-
- if (isDefaultExchange(exchangeName))
- {
- if (routingKey == null)
- {
- if (queueName == null)
- {
- replyCode = virtualHost.getQueues().isEmpty()
- ? ExchangeBoundOkBody.NO_BINDINGS
- : ExchangeBoundOkBody.OK;
- replyText = null;
-
- }
- else
- {
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
- replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
- replyText = "Queue '" + queueName + "' not found";
- }
- else
- {
- replyCode = ExchangeBoundOkBody.OK;
- replyText = null;
- }
- }
- }
- else
- {
- if (queueName == null)
- {
- replyCode = virtualHost.getQueue(routingKey.toString()) == null
- ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK
- : ExchangeBoundOkBody.OK;
- replyText = null;
- }
- else
- {
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
-
- replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
- replyText = "Queue '" + queueName + "' not found";
- }
- else
- {
- replyCode = queueName.equals(routingKey)
- ? ExchangeBoundOkBody.OK
- : ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
- replyText = null;
- }
- }
- }
- }
- else
- {
- ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
- if (exchange == null)
- {
-
- replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND;
- replyText = "Exchange '" + exchangeName + "' not found";
- }
- else if (routingKey == null)
- {
- if (queueName == null)
- {
- if (exchange.hasBindings())
- {
- replyCode = ExchangeBoundOkBody.OK;
- replyText = null;
- }
- else
- {
- replyCode = ExchangeBoundOkBody.NO_BINDINGS;
- replyText = null;
- }
- }
- else
- {
-
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
- replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
- replyText = "Queue '" + queueName + "' not found";
- }
- else
- {
- if (exchange.isBound(queue))
- {
- replyCode = ExchangeBoundOkBody.OK;
- replyText = null;
- }
- else
- {
- replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND;
- replyText = "Queue '"
- + queueName
- + "' not bound to exchange '"
- + exchangeName
- + "'";
- }
- }
- }
- }
- else if (queueName != null)
- {
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
- replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
- replyText = "Queue '" + queueName + "' not found";
- }
- else
- {
- String bindingKey = routingKey == null ? null : routingKey.asString();
- if (exchange.isBound(bindingKey, queue))
- {
-
- replyCode = ExchangeBoundOkBody.OK;
- replyText = null;
- }
- else
- {
- replyCode = ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
- replyText = "Queue '" + queueName + "' not bound with routing key '" +
- routingKey + "' to exchange '" + exchangeName + "'";
-
- }
- }
- }
- else
- {
- if (exchange.isBound(routingKey == null ? "" : routingKey.asString()))
- {
-
- replyCode = ExchangeBoundOkBody.OK;
- replyText = null;
- }
- else
- {
- replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK;
- replyText =
- "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'";
- }
- }
- }
-
- ExchangeBoundOkBody exchangeBoundOkBody =
- methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText));
-
- _connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId()));
-
- }
-
- @Override
- public void receiveExchangeDeclare(final AMQShortString exchangeName,
- final AMQShortString type,
- final boolean passive,
- final boolean durable,
- final boolean autoDelete,
- final boolean internal,
- final boolean nowait,
- final FieldTable arguments)
- {
- ExchangeImpl exchange;
- VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost();
- if (isDefaultExchange(exchangeName))
- {
- if (!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type))
- {
- closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
- + " of type "
- + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
- + " to " + type + ".");
- }
- else if (!nowait)
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
- _channel.sync();
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
-
- }
- else
- {
- if (passive)
- {
- exchange = virtualHost.getExchange(exchangeName.toString());
- if (exchange == null)
- {
- closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
- }
- else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.asString()))
- {
-
- closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
- +
- exchangeName
- + " of type "
- + exchange.getType()
- + " to "
- + type
- + ".");
- }
- else if (!nowait)
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
- _channel.sync();
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
-
- }
- else
- {
- try
- {
- String name = exchangeName == null ? null : exchangeName.intern().toString();
- String typeString = type == null ? null : type.intern().toString();
-
- Map<String, Object> attributes = new HashMap<String, Object>();
- if (arguments != null)
- {
- attributes.putAll(FieldTable.convertToMap(arguments));
- }
- attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE, typeString);
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- if (!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
- {
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
- }
- exchange = virtualHost.createExchange(attributes);
-
- if (!nowait)
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
- _channel.sync();
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
-
- }
- catch (ReservedExchangeNameException e)
- {
- closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix.");
-
-
- }
- catch (ExchangeExistsException e)
- {
- exchange = e.getExistingExchange();
- if (!new AMQShortString(exchange.getType()).equals(type))
- {
- closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type "
- + exchange.getType()
- + " to " + type + ".");
-
- }
- }
- catch (NoFactoryForTypeException e)
- {
- closeConnection(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"
- + e.getType()
- + "' for exchange '"
- + exchangeName
- + "'");
-
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
-
- }
- catch (UnknownConfiguredObjectException e)
- {
- // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
- final String message = "Unknown alternate exchange "
- + (e.getName() != null
- ? "name: \"" + e.getName() + "\""
- : "id: " + e.getId());
- closeConnection(AMQConstant.NOT_FOUND, message);
-
- }
- catch (IllegalArgumentException e)
- {
- closeConnection(AMQConstant.COMMAND_INVALID, "Error creating exchange '"
- + exchangeName
- + "': "
- + e.getMessage());
-
- }
- }
- }
-
- }
-
- @Override
- public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait)
- {
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
- _channel.sync();
- try
- {
-
- if (isDefaultExchange(exchangeStr))
- {
- closeConnection(AMQConstant.NOT_ALLOWED,
- "Default Exchange cannot be deleted");
-
- }
-
- else
- {
- final String exchangeName = exchangeStr.toString();
-
- final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
- if (exchange == null)
- {
- closeChannel(AMQConstant.NOT_FOUND, "No such exchange: " + exchangeStr);
- }
- else
- {
- virtualHost.removeExchange(exchange, !ifUnused);
-
- ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody();
-
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
- }
- }
- catch (ExchangeIsAlternateException e)
- {
- closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
- }
- catch (RequiredExchangeException e)
- {
- closeChannel(AMQConstant.NOT_ALLOWED, "Exchange '" + exchangeStr + "' cannot be deleted");
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
- }
- }
-
- @Override
- public void receiveQueueBind(final AMQShortString queueName,
- final AMQShortString exchange,
- AMQShortString routingKey,
- final boolean nowait,
- final FieldTable argumentsTable)
- {
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
- AMQQueue<?> queue;
- if (queueName == null)
- {
-
- queue = _channel.getDefaultQueue();
-
- if (queue != null)
- {
- if (routingKey == null)
- {
- routingKey = AMQShortString.valueOf(queue.getName());
- }
- else
- {
- routingKey = routingKey.intern();
- }
- }
- }
- else
- {
- queue = virtualHost.getQueue(queueName.toString());
- routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey.intern();
- }
-
- if (queue == null)
- {
- String message = queueName == null
- ? "No default queue defined on channel and queue was null"
- : "Queue " + queueName + " does not exist.";
- closeChannel(AMQConstant.NOT_FOUND, message);
- }
- else if (isDefaultExchange(exchange))
- {
- closeConnection(AMQConstant.NOT_ALLOWED,
- "Cannot bind the queue " + queueName + " to the default exchange"
- );
-
- }
- else
- {
-
- final String exchangeName = exchange.toString();
-
- final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
- if (exch == null)
- {
- closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.");
- }
- else
- {
-
- try
- {
-
- Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable);
- String bindingKey = String.valueOf(routingKey);
-
- if (!exch.isBound(bindingKey, arguments, queue))
- {
-
- if (!exch.addBinding(bindingKey, queue, arguments)
- && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(
- exch.getType()))
- {
- exch.replaceBinding(bindingKey, queue, arguments);
- }
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Binding queue "
- + queue
- + " to exchange "
- + exch
- + " with routing key "
- + routingKey);
- }
- if (!nowait)
- {
- _channel.sync();
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- }
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
- }
- }
- }
- }
-
- @Override
- public void receiveQueueDeclare(final AMQShortString queueStr,
- final boolean passive,
- final boolean durable,
- final boolean exclusive,
- final boolean autoDelete,
- final boolean nowait,
- final FieldTable arguments)
- {
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
-
- final AMQShortString queueName;
-
- // if we aren't given a queue name, we create one which we return to the client
- if ((queueStr == null) || (queueStr.length() == 0))
- {
- queueName = new AMQShortString("tmp_" + UUID.randomUUID());
- }
- else
- {
- queueName = queueStr.intern();
- }
-
- AMQQueue queue;
-
- //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
-
-
- if (passive)
- {
- queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
- {
- closeChannel(AMQConstant.NOT_FOUND,
- "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").");
- }
- else
- {
- if (!queue.verifySessionAccess(_channel))
- {
- closeConnection(AMQConstant.NOT_ALLOWED,
- "Queue "
- + queue.getName()
- + " is exclusive, but not created on this Connection.");
- }
- else
- {
- //set this as the default queue on the channel:
- _channel.setDefaultQueue(queue);
- if (!nowait)
- {
- _channel.sync();
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- QueueDeclareOkBody responseBody =
- methodRegistry.createQueueDeclareOkBody(queueName,
- queue.getQueueDepthMessages(),
- queue.getConsumerCount());
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- _logger.info("Queue " + queueName + " declared successfully");
- }
- }
- }
- }
- else
- {
-
- try
- {
- Map<String, Object> attributes =
- QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments));
- final String queueNameString = AMQShortString.toString(queueName);
- attributes.put(Queue.NAME, queueNameString);
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.DURABLE, durable);
-
- LifetimePolicy lifetimePolicy;
- ExclusivityPolicy exclusivityPolicy;
-
- if (exclusive)
- {
- lifetimePolicy = autoDelete
- ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
- : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
- exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
- }
- else
- {
- lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
- exclusivityPolicy = ExclusivityPolicy.NONE;
- }
-
- attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
- attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
-
-
- queue = virtualHost.createQueue(attributes);
-
- _channel.setDefaultQueue(queue);
-
- if (!nowait)
- {
- _channel.sync();
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- QueueDeclareOkBody responseBody =
- methodRegistry.createQueueDeclareOkBody(queueName,
- queue.getQueueDepthMessages(),
- queue.getConsumerCount());
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- _logger.info("Queue " + queueName + " declared successfully");
- }
- }
- catch (QueueExistsException qe)
- {
-
- queue = qe.getExistingQueue();
-
- if (!queue.verifySessionAccess(_channel))
- {
- closeConnection(AMQConstant.NOT_ALLOWED,
- "Queue "
- + queue.getName()
- + " is exclusive, but not created on this Connection.");
-
- }
- else if (queue.isExclusive() != exclusive)
- {
-
- closeChannel(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '"
- + queue.getName()
- + "' with different exclusivity (was: "
- + queue.isExclusive()
- + " requested "
- + exclusive
- + ")");
- }
- else if ((autoDelete
- && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
- || (!autoDelete && queue.getLifetimePolicy() != ((exclusive
- && !durable)
- ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- : LifetimePolicy.PERMANENT)))
- {
- closeChannel(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '"
- + queue.getName()
- + "' with different lifetime policy (was: "
- + queue.getLifetimePolicy()
- + " requested autodelete: "
- + autoDelete
- + ")");
- }
- else if (queue.isDurable() != durable)
- {
- closeChannel(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '"
- + queue.getName()
- + "' with different durability (was: "
- + queue.isDurable()
- + " requested "
- + durable
- + ")");
- }
- else
- {
- _channel.setDefaultQueue(queue);
- if (!nowait)
- {
- _channel.sync();
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- QueueDeclareOkBody responseBody =
- methodRegistry.createQueueDeclareOkBody(queueName,
- queue.getQueueDepthMessages(),
- queue.getConsumerCount());
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- _logger.info("Queue " + queueName + " declared successfully");
- }
- }
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
- }
-
- }
- }
-
- @Override
- public void receiveQueueDelete(final AMQShortString queueName,
- final boolean ifUnused,
- final boolean ifEmpty,
- final boolean nowait)
- {
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
- _channel.sync();
- AMQQueue queue;
- if (queueName == null)
- {
-
- //get the default queue on the channel:
- queue = _channel.getDefaultQueue();
- }
- else
- {
- queue = virtualHost.getQueue(queueName.toString());
- }
-
- if (queue == null)
- {
- closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
-
- }
- else
- {
- if (ifEmpty && !queue.isEmpty())
- {
- closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is not empty.");
- }
- else if (ifUnused && !queue.isUnused())
- {
- // TODO - Error code
- closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is still used.");
- }
- else
- {
- if (!queue.verifySessionAccess(_channel))
- {
- closeConnection(AMQConstant.NOT_ALLOWED,
- "Queue "
- + queue.getName()
- + " is exclusive, but not created on this Connection.");
-
- }
- else
- {
- int purged = 0;
- try
- {
- purged = virtualHost.removeQueue(queue);
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
-
- }
- }
- }
- }
- }
-
- @Override
- public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait)
- {
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
- AMQQueue queue = null;
- if (queueName == null && (queue = _channel.getDefaultQueue()) == null)
- {
-
- closeConnection(AMQConstant.NOT_ALLOWED, "No queue specified.");
- }
- else if ((queueName != null) && (queue = virtualHost.getQueue(queueName.toString())) == null)
- {
- closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
- }
- else if (!queue.verifySessionAccess(_channel))
- {
- closeConnection(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection."
- );
- }
- else
- {
- try
- {
- long purged = queue.clearQueue();
- if (!nowait)
- {
- _channel.sync();
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- }
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
-
- }
-
- }
- }
-
- @Override
- public void receiveQueueUnbind(final AMQShortString queueName,
- final AMQShortString exchange,
- AMQShortString routingKey,
- final FieldTable arguments)
- {
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
-
-
-
- final boolean useDefaultQueue = queueName == null;
- final AMQQueue queue = useDefaultQueue
- ? _channel.getDefaultQueue()
- : virtualHost.getQueue(queueName.toString());
-
-
- if (queue == null)
- {
- String message = useDefaultQueue
- ? "No default queue defined on channel and queue was null"
- : "Queue " + queueName + " does not exist.";
- closeChannel(AMQConstant.NOT_FOUND, message);
- }
- else if (isDefaultExchange(exchange))
- {
- closeConnection(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue "
- + queue.getName()
- + " from the default exchange");
-
- }
- else
- {
-
- final ExchangeImpl exch = virtualHost.getExchange(exchange.toString());
-
- if (exch == null)
- {
- closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchange + " does not exist.");
- }
- else if (!exch.hasBinding(String.valueOf(routingKey), queue))
- {
- closeChannel(AMQConstant.NOT_FOUND, "No such binding");
- }
- else
- {
- try
- {
- exch.deleteBinding(String.valueOf(routingKey), queue);
-
- final AMQMethodBody responseBody = _connection.getMethodRegistry().createQueueUnbindOkBody();
- _channel.sync();
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
-
- }
- }
-
- }
- }
-
- @Override
- public void receiveTxSelect()
- {
- _channel.setLocalTransactional();
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
-
- }
-
- @Override
- public void receiveTxCommit()
- {
- if (!_channel.isTransactional())
- {
- closeChannel(AMQConstant.COMMAND_INVALID, "Fatal error: commit called on non-transactional channel");
- }
- _channel.commit(new Runnable()
- {
-
- @Override
- public void run()
- {
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
- }, true);
-
- }
-
- @Override
- public void receiveTxRollback()
- {
- if (!_channel.isTransactional())
- {
- closeChannel(AMQConstant.COMMAND_INVALID, "Fatal error: rollback called on non-transactional channel");
- }
-
- final MethodRegistry methodRegistry = _connection.getMethodRegistry();
- final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
-
- Runnable task = new Runnable()
- {
-
- public void run()
- {
- _connection.writeFrame(responseBody.generateFrame(getChannelId()));
- }
- };
-
- _channel.rollback(task);
-
- //Now resend all the unacknowledged messages back to the original subscribers.
- //(Must be done after the TxnRollback-ok response).
- // Why, are we not allowed to send messages back to client before the ok method?
- _channel.resend();
- }
-
- private void closeChannel(final AMQConstant cause, final String message)
- {
- _connection.closeChannelAndWriteFrame(_channel, cause, message);
- }
-
- private void closeConnection(final AMQConstant cause, final String message)
- {
- _connection.closeConnection(cause, message, getChannelId());
- }
-
- private int getChannelId()
- {
- return _channel.getChannelId();
- }
-
- private boolean isDefaultExchange(final AMQShortString exchangeName)
- {
- return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName);
- }
-
- public static boolean performGet(final AMQQueue queue,
- final AMQProtocolEngine connection,
- final AMQChannel channel,
- final boolean acks)
- throws MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
- {
-
- final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
-
- final GetDeliveryMethod getDeliveryMethod =
- new GetDeliveryMethod(singleMessageCredit, connection, channel, queue);
- final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
- {
-
- public void recordMessageDelivery(final ConsumerImpl sub,
- final MessageInstance entry,
- final long deliveryTag)
- {
- channel.addUnacknowledgedMessage(entry, deliveryTag, null);
- }
- };
-
- ConsumerTarget_0_8 target;
- EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES);
- if (acks)
- {
-
- target = ConsumerTarget_0_8.createAckTarget(channel,
- AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod, getRecordMethod);
- }
- else
- {
- target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
- AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod, getRecordMethod);
- }
-
- ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
- sub.flush();
- sub.close();
- return getDeliveryMethod.hasDeliveredMessage();
-
-
- }
-
-
- private static class GetDeliveryMethod implements ClientDeliveryMethod
- {
-
- private final FlowCreditManager _singleMessageCredit;
- private final AMQProtocolEngine _connection;
- private final AMQChannel _channel;
- private final AMQQueue _queue;
- private boolean _deliveredMessage;
-
- public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
- final AMQProtocolEngine connection,
- final AMQChannel channel, final AMQQueue queue)
- {
- _singleMessageCredit = singleMessageCredit;
- _connection = connection;
- _channel = channel;
- _queue = queue;
- }
-
- @Override
- public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
- final InstanceProperties props, final long deliveryTag)
- {
- _singleMessageCredit.useCreditForMessage(message.getSize());
- long size = _connection.getProtocolOutputConverter().writeGetOk(message,
- props,
- _channel.getChannelId(),
- deliveryTag,
- _queue.getQueueDepthMessages());
-
- _deliveredMessage = true;
- return size;
- }
-
- public boolean hasDeliveredMessage()
- {
- return _deliveredMessage;
- }
- }
-
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
index 4df880d246..ac185d1aa9 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
@@ -83,7 +83,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
@Override
public Void run()
{
- action.onChannel(channel.getMethodProcessor());
+ action.onChannel(channel);
return null;
}
});
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
index a5866bb1f3..625836bcf2 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
@@ -70,7 +70,7 @@ public class ServerMethodProcessor implements MethodProcessor
@Override
public Void run()
{
- action.onChannel(channel.getMethodProcessor());
+ action.onChannel(channel);
return null;
}
});