diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java | 47 |
1 files changed, 28 insertions, 19 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index d2b1f83513..dc9a6484fa 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -56,9 +56,12 @@ import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -256,7 +259,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _channelId; } - public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException + public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException { String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); SecurityManager securityManager = getVirtualHost().getSecurityManager(); @@ -265,7 +268,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F throw new AMQSecurityException("Permission denied: " + e.getName()); } _currentMessage = new IncomingMessage(info); - _currentMessage.setExchange(e); + _currentMessage.setMessageDestination(e); } public void publishContentHeader(ContentHeaderBody contentHeaderBody) @@ -350,7 +353,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } }; - int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction, + int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction, immediate ? _immediateAction : _capacityCheckAction); if(enqueues == 0) { @@ -497,19 +500,19 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. * + * * @param tag the tag chosen by the client (if null, server will generate one) - * @param queue the queue to subscribe to + * @param source the queue to subscribe to * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * - * @param noLocal Flag stopping own messages being received. * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * * @throws AMQException if something goes wrong */ - public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException + public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, + FieldTable filters, boolean exclusive) throws AMQException { if (tag == null) { @@ -557,7 +560,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F try { Consumer sub = - queue.addConsumer(target, + source.addConsumer(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), @@ -1189,16 +1192,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private class ImmediateAction implements Action<QueueEntry> + private class ImmediateAction implements Action<MessageInstance> { public ImmediateAction() { } - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - AMQQueue queue = entry.getQueue(); + TransactionLogResource queue = entry.getOwningResource(); if (!entry.getDeliveredToConsumer() && entry.acquire()) { @@ -1246,19 +1249,25 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { - queue.checkCapacity(AMQChannel.this); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(AMQChannel.this); + } } } } - private final class CapacityCheckAction implements Action<QueueEntry> + private final class CapacityCheckAction implements Action<MessageInstance> { @Override - public void performAction(final QueueEntry entry) + public void performAction(final MessageInstance entry) { - AMQQueue queue = entry.getQueue(); - queue.checkCapacity(AMQChannel.this); + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(AMQChannel.this); + } } } @@ -1477,13 +1486,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F final ServerMessage msg = rejectedQueueEntry.getMessage(); final Consumer sub = rejectedQueueEntry.getDeliveredConsumer(); - int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>() + int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>() { @Override - public void performAction(final QueueEntry requeueEntry) + public void performAction(final MessageInstance requeueEntry) { _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); + requeueEntry.getOwningResource().getName())); } }, null); |