summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java269
1 files changed, 196 insertions, 73 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 333c1b9cac..1b03ee2334 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,45 +20,72 @@
*/
package org.apache.qpid.server;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SessionConfigType;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.*;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.txn.*;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
-public class AMQChannel
+public class AMQChannel implements SessionConfig
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -123,6 +150,7 @@ public class AMQChannel
private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
+ private final UUID _id;
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -132,15 +160,22 @@ public class AMQChannel
_actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_logSubject = new ChannelLogSubject(this);
-
+ _id = getConfigStore().createId();
_actor.message(ChannelMessages.CHN_CREATE());
+ getConfigStore().addConfiguredObject(this);
+
_messageStore = messageStore;
// by default the session is non-transactional
_transaction = new AutoCommitTransaction(_messageStore);
}
+ public ConfigStore getConfigStore()
+ {
+ return getVirtualHost().getConfigStore();
+ }
+
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
@@ -220,7 +255,7 @@ public class AMQChannel
try
{
- final ArrayList<AMQQueue> destinationQueues = _currentMessage.getDestinationQueues();
+ final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
{
@@ -411,6 +446,8 @@ public class AMQChannel
_logger.error("Caught AMQException whilst attempting to reque:" + e);
}
+ getConfigStore().removeConfiguredObject(this);
+
}
private void setClosing(boolean closing)
@@ -970,10 +1007,10 @@ public class AMQChannel
private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
- private ArrayList<AMQQueue> _destinationQueues;
+ private ArrayList<? extends BaseQueue> _destinationQueues;
public MessageDeliveryAction(IncomingMessage currentMessage,
- ArrayList<AMQQueue> destinationQueues)
+ ArrayList<? extends BaseQueue> destinationQueues)
{
_incommingMessage = currentMessage;
_destinationQueues = destinationQueues;
@@ -988,53 +1025,24 @@ public class AMQChannel
final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
MessageReference ref = amqMessage.newReference();
- for(AMQQueue queue : _destinationQueues)
+ for(final BaseQueue queue : _destinationQueues)
{
+ BaseQueue.PostEnqueueAction action;
- QueueEntry entry = queue.enqueue(amqMessage);
- queue.checkCapacity(AMQChannel.this);
-
-
- if(immediate && !entry.getDeliveredToConsumer() && entry.acquire())
+ if(immediate)
{
+ action = new ImmediateAction(queue);
+ }
+ else
+ {
+ action = null;
+ }
+ queue.enqueue(amqMessage, action);
- ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
- final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
- {
- @Override
- public void postCommit()
- {
- try
- {
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- super.postCommit();
- }
- }
- );
- txn.commit();
-
-
-
-
+ if(queue instanceof AMQQueue)
+ {
+ ((AMQQueue)queue).checkCapacity(AMQChannel.this);
}
}
@@ -1057,6 +1065,60 @@ public class AMQChannel
// Maybe keep track of entries that were created and then delete them here in case of failure
// to in memory enqueue
}
+
+ private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ {
+ private final BaseQueue _queue;
+
+ public ImmediateAction(BaseQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public void onEnqueue(QueueEntry entry)
+ {
+ if (!entry.getDeliveredToConsumer() && entry.acquire())
+ {
+
+
+ ServerTransaction txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ txn.dequeue(_queue, entry.getMessage(),
+ new MessageAcknowledgeAction(entries)
+ {
+ @Override
+ public void postCommit()
+ {
+ try
+ {
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ super.postCommit();
+ }
+ }
+ );
+ txn.commit();
+
+
+ }
+
+ }
+ }
}
private class MessageAcknowledgeAction implements ServerTransaction.Action
@@ -1163,7 +1225,7 @@ public class AMQChannel
if(_blocking.compareAndSet(false,true))
{
- _actor.message(_logSubject, ChannelMessages.CHN_FLOW_ENFORCED(queue.getName().toString()));
+ _actor.message(_logSubject, ChannelMessages.CHN_FLOW_ENFORCED(queue.getNameShortString().toString()));
flow(false);
}
}
@@ -1188,9 +1250,70 @@ public class AMQChannel
AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
_session.writeFrame(responseBody.generateFrame(_channelId));
}
-
+
public boolean getBlocking()
{
return _blocking.get();
}
+
+ public VirtualHost getVirtualHost()
+ {
+ return getProtocolSession().getVirtualHost();
+ }
+
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public SessionConfigType getConfigType()
+ {
+ return SessionConfigType.getInstance();
+ }
+
+ public int getChannel()
+ {
+ return getChannelId();
+ }
+
+ public boolean isAttached()
+ {
+ return true;
+ }
+
+ public long getDetachedLifespan()
+ {
+ return 0;
+ }
+
+ public ConnectionConfig getConnectionConfig()
+ {
+ return (AMQProtocolEngine)getProtocolSession();
+ }
+
+ public Long getExpiryTime()
+ {
+ return null;
+ }
+
+ public Long getMaxClientRate()
+ {
+ return null;
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public String getSessionName()
+ {
+ return getConnectionConfig().getAddress() + "/" + getChannelId();
+ }
}