diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 269 |
1 files changed, 196 insertions, 73 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 333c1b9cac..1b03ee2334 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,45 +20,72 @@ */ package org.apache.qpid.server; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConfiguredObject; +import org.apache.qpid.server.configuration.ConnectionConfig; +import org.apache.qpid.server.configuration.SessionConfig; +import org.apache.qpid.server.configuration.SessionConfigType; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.*; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.txn.*; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.messages.ChannelMessages; -import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.protocol.AMQProtocolEngine; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.RecordDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; -public class AMQChannel +public class AMQChannel implements SessionConfig { public static final int DEFAULT_PREFETCH = 5000; @@ -123,6 +150,7 @@ public class AMQChannel private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); + private final UUID _id; public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException @@ -132,15 +160,22 @@ public class AMQChannel _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); _logSubject = new ChannelLogSubject(this); - + _id = getConfigStore().createId(); _actor.message(ChannelMessages.CHN_CREATE()); + getConfigStore().addConfiguredObject(this); + _messageStore = messageStore; // by default the session is non-transactional _transaction = new AutoCommitTransaction(_messageStore); } + public ConfigStore getConfigStore() + { + return getVirtualHost().getConfigStore(); + } + /** Sets this channel to be part of a local transaction */ public void setLocalTransactional() { @@ -220,7 +255,7 @@ public class AMQChannel try { - final ArrayList<AMQQueue> destinationQueues = _currentMessage.getDestinationQueues(); + final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); if(!checkMessageUserId(_currentMessage.getContentHeader())) { @@ -411,6 +446,8 @@ public class AMQChannel _logger.error("Caught AMQException whilst attempting to reque:" + e); } + getConfigStore().removeConfiguredObject(this); + } private void setClosing(boolean closing) @@ -970,10 +1007,10 @@ public class AMQChannel private class MessageDeliveryAction implements ServerTransaction.Action { private IncomingMessage _incommingMessage; - private ArrayList<AMQQueue> _destinationQueues; + private ArrayList<? extends BaseQueue> _destinationQueues; public MessageDeliveryAction(IncomingMessage currentMessage, - ArrayList<AMQQueue> destinationQueues) + ArrayList<? extends BaseQueue> destinationQueues) { _incommingMessage = currentMessage; _destinationQueues = destinationQueues; @@ -988,53 +1025,24 @@ public class AMQChannel final AMQMessage amqMessage = createAMQMessage(_incommingMessage); MessageReference ref = amqMessage.newReference(); - for(AMQQueue queue : _destinationQueues) + for(final BaseQueue queue : _destinationQueues) { + BaseQueue.PostEnqueueAction action; - QueueEntry entry = queue.enqueue(amqMessage); - queue.checkCapacity(AMQChannel.this); - - - if(immediate && !entry.getDeliveredToConsumer() && entry.acquire()) + if(immediate) { + action = new ImmediateAction(queue); + } + else + { + action = null; + } + queue.enqueue(amqMessage, action); - ServerTransaction txn = new LocalTransaction(_messageStore); - Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); - entries.add(entry); - final AMQMessage message = (AMQMessage) entry.getMessage(); - txn.dequeue(queue, entry.getMessage(), - new MessageAcknowledgeAction(entries) - { - @Override - public void postCommit() - { - try - { - final - ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); - - outputConverter.writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - IMMEDIATE_DELIVERY_REPLY_TEXT); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - super.postCommit(); - } - } - ); - txn.commit(); - - - - + if(queue instanceof AMQQueue) + { + ((AMQQueue)queue).checkCapacity(AMQChannel.this); } } @@ -1057,6 +1065,60 @@ public class AMQChannel // Maybe keep track of entries that were created and then delete them here in case of failure // to in memory enqueue } + + private class ImmediateAction implements BaseQueue.PostEnqueueAction + { + private final BaseQueue _queue; + + public ImmediateAction(BaseQueue queue) + { + _queue = queue; + } + + public void onEnqueue(QueueEntry entry) + { + if (!entry.getDeliveredToConsumer() && entry.acquire()) + { + + + ServerTransaction txn = new LocalTransaction(_messageStore); + Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); + entries.add(entry); + final AMQMessage message = (AMQMessage) entry.getMessage(); + txn.dequeue(_queue, entry.getMessage(), + new MessageAcknowledgeAction(entries) + { + @Override + public void postCommit() + { + try + { + final + ProtocolOutputConverter outputConverter = + _session.getProtocolOutputConverter(); + + outputConverter.writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + IMMEDIATE_DELIVERY_REPLY_TEXT); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + super.postCommit(); + } + } + ); + txn.commit(); + + + } + + } + } } private class MessageAcknowledgeAction implements ServerTransaction.Action @@ -1163,7 +1225,7 @@ public class AMQChannel if(_blocking.compareAndSet(false,true)) { - _actor.message(_logSubject, ChannelMessages.CHN_FLOW_ENFORCED(queue.getName().toString())); + _actor.message(_logSubject, ChannelMessages.CHN_FLOW_ENFORCED(queue.getNameShortString().toString())); flow(false); } } @@ -1188,9 +1250,70 @@ public class AMQChannel AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); _session.writeFrame(responseBody.generateFrame(_channelId)); } - + public boolean getBlocking() { return _blocking.get(); } + + public VirtualHost getVirtualHost() + { + return getProtocolSession().getVirtualHost(); + } + + + public ConfiguredObject getParent() + { + return getVirtualHost(); + } + + public SessionConfigType getConfigType() + { + return SessionConfigType.getInstance(); + } + + public int getChannel() + { + return getChannelId(); + } + + public boolean isAttached() + { + return true; + } + + public long getDetachedLifespan() + { + return 0; + } + + public ConnectionConfig getConnectionConfig() + { + return (AMQProtocolEngine)getProtocolSession(); + } + + public Long getExpiryTime() + { + return null; + } + + public Long getMaxClientRate() + { + return null; + } + + public boolean isDurable() + { + return false; + } + + public UUID getId() + { + return _id; + } + + public String getSessionName() + { + return getConnectionConfig().getAddress() + "/" + getChannelId(); + } } |