summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java146
1 files changed, 56 insertions, 90 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 7574cc3533..b4f276a45a 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -105,8 +105,8 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.TransportException;
-public class AMQChannel<T extends AMQProtocolSession<T>>
- implements AMQSessionModel<AMQChannel<T>,T>,
+public class AMQChannel
+ implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -158,7 +158,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final T _session;
+ private final AMQProtocolEngine _connection;
private AtomicBoolean _closing = new AtomicBoolean(false);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -180,8 +180,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
- private final List<Action<? super AMQChannel<T>>> _taskList =
- new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+ private final List<Action<? super AMQChannel>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQChannel>>();
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
@@ -191,17 +191,18 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
+ private ChannelMethodProcessor _channelMethodProcessor;
- public AMQChannel(T session, int channelId, final MessageStore messageStore)
+ public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
throws AMQException
{
- _session = session;
+ _connection = connection;
_channelId = channelId;
- _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(),
- session.getAuthorizedSubject().getPublicCredentials(),
- session.getAuthorizedSubject().getPrivateCredentials());
+ _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(),
+ connection.getAuthorizedSubject().getPublicCredentials(),
+ connection.getAuthorizedSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
_logSubject = new ChannelLogSubject(this);
@@ -210,7 +211,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// by default the session is non-transactional
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
- _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+ _clientDeliveryMethod = connection.createDeliveryMethod(_channelId);
_transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
{
@@ -238,6 +239,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
return null;
}
});
+ _channelMethodProcessor = new ChannelMethodProcessorImpl(this);
}
@@ -249,7 +251,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
@Override
public long getActivityTime()
{
- return _session.getLastReceivedTime();
+ return _connection.getLastReceivedTime();
}
});
_txnStarts.incrementAndGet();
@@ -354,7 +356,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
final MessageMetaData messageMetaData =
new MessageMetaData(_currentMessage.getMessagePublishInfo(),
_currentMessage.getContentHeader(),
- getProtocolSession().getLastReceivedTime());
+ getConnection().getLastReceivedTime());
final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
@@ -429,7 +431,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
long bodySize = _currentMessage.getSize();
long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
- _session.registerMessageReceived(bodySize, timestamp);
+ _connection.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
}
@@ -442,13 +444,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
* Pre-requisite: the current message is judged to have no destination queues.
*
* @throws AMQConnectionException if the message is mandatory close-on-no-route
- * @see AMQProtocolSession#isCloseWhenNoRoute()
+ * @see AMQProtocolEngine#isCloseWhenNoRoute()
*/
private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
{
boolean mandatory = message.isMandatory();
String description = currentMessageDescription();
- boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
+ boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
if(_logger.isDebugEnabled())
{
@@ -457,13 +459,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
description, mandatory, isTransactional(), closeOnNoRoute));
}
- if (mandatory && isTransactional() && _session.isCloseWhenNoRoute())
+ if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
{
throw new AMQConnectionException(
AMQConstant.NO_ROUTE,
"No route for message " + currentMessageDescription(),
0, 0, // default class and method ids
- getProtocolSession().getMethodRegistry(),
+ getConnection().getMethodRegistry(),
(Throwable) null);
}
@@ -564,9 +566,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
*/
public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
- throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
- MessageSource.ConsumerAccessRefused
+ throws MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer,
+ AMQInvalidArgumentException,
+ MessageSource.ConsumerAccessRefused, ConsumerTagInUseException
{
if (tag == null)
{
@@ -575,7 +578,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
if (_tag2SubscriptionTargetMap.containsKey(tag))
{
- throw new AMQException("Consumer already exists with same tag: " + tag);
+ throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
}
ConsumerTarget_0_8 target;
@@ -647,27 +650,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
}
}
- catch (AccessControlException e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ExistingExclusiveConsumer e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ExistingConsumerPreventsExclusive e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (AMQInvalidArgumentException e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ConsumerAccessRefused e)
+ catch (AccessControlException
+ | MessageSource.ExistingExclusiveConsumer
+ | MessageSource.ExistingConsumerPreventsExclusive
+ | AMQInvalidArgumentException
+ | MessageSource.ConsumerAccessRefused e)
{
_tag2SubscriptionTargetMap.remove(tag);
throw e;
@@ -728,7 +715,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
unsubscribeAllConsumers();
- for (Action<? super AMQChannel<T>> task : _taskList)
+ for (Action<? super AMQChannel> task : _taskList)
{
task.performAction(this);
}
@@ -895,9 +882,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
- * @throws AMQException When something goes wrong.
*/
- public void resend() throws AMQException
+ public void resend()
{
@@ -983,9 +969,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
*
- * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1082,22 +1067,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public boolean isSuspended()
{
- return _suspended.get() || _closing.get() || _session.isClosing();
- }
-
- public void commit() throws AMQException
- {
- commit(null, false);
+ return _suspended.get() || _closing.get() || _connection.isClosing();
}
- public void commit(final Runnable immediateAction, boolean async) throws AMQException
+ public void commit(final Runnable immediateAction, boolean async)
{
- if (!isTransactional())
- {
- throw new AMQException("Fatal error: commit called on non-transactional channel");
- }
if(async && _transaction instanceof LocalTransaction)
{
@@ -1130,17 +1106,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
}
- public void rollback() throws AMQException
- {
- rollback(NULL_TASK);
- }
-
- public void rollback(Runnable postRollbackTask) throws AMQException
+ public void rollback(Runnable postRollbackTask)
{
- if (!isTransactional())
- {
- throw new AMQException("Fatal error: commit called on non-transactional channel");
- }
// stop all subscriptions
_rollingBack = true;
@@ -1198,7 +1165,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public String toString()
{
- return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
+ return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
}
public void setDefaultQueue(AMQQueue queue)
@@ -1217,9 +1184,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
return _closing.get();
}
- public AMQProtocolSession getProtocolSession()
+ public AMQProtocolEngine getConnection()
{
- return _session;
+ return _connection;
}
public FlowCreditManager getCreditManager()
@@ -1262,7 +1229,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
throws AMQException
{
- AMQMessage message = new AMQMessage(handle, _session.getReference());
+ AMQMessage message = new AMQMessage(handle, _connection.getReference());
final BasicContentHeaderProperties properties =
incomingMessage.getContentHeader().getProperties();
@@ -1273,7 +1240,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private boolean checkMessageUserId(ContentHeaderBody header)
{
AMQShortString userID = header.getProperties().getUserId();
- return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
+ return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
}
@@ -1284,14 +1251,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
@Override
- public T getConnectionModel()
+ public AMQProtocolEngine getConnectionModel()
{
- return _session;
+ return _connection;
}
public String getClientID()
{
- return String.valueOf(_session.getContextKey());
+ return String.valueOf(_connection.getContextKey());
}
public LogSubject getLogSubject()
@@ -1306,13 +1273,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
@Override
- public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+ public void addDeleteTask(final Action<? super AMQChannel> task)
{
_taskList.add(task);
}
@Override
- public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+ public void removeDeleteTask(final Action<? super AMQChannel> task)
{
_taskList.remove(task);
}
@@ -1324,8 +1291,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public ChannelMethodProcessor getMethodProcessor()
{
- // TODO
- return null;
+ return _channelMethodProcessor;
}
@@ -1356,7 +1322,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public void postCommit()
{
final ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
+ _connection.getProtocolOutputConverter();
outputConverter.writeReturn(message.getMessagePublishInfo(),
message.getContentHeaderBody(),
@@ -1479,7 +1445,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public void postCommit()
{
AMQMessage message = _reference.getMessage();
- _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+ _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
message.getContentHeaderBody(),
message,
_channelId,
@@ -1548,7 +1514,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
@Override
public Object getConnectionReference()
{
- return getProtocolSession().getReference();
+ return getConnection().getReference();
}
public int getUnacknowledgedMessageCount()
@@ -1558,9 +1524,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private void flow(boolean flow)
{
- MethodRegistry methodRegistry = _session.getMethodRegistry();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
- _session.writeFrame(responseBody.generateFrame(_channelId));
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
}
@Override
@@ -1571,7 +1537,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public VirtualHostImpl getVirtualHost()
{
- return getProtocolSession().getVirtualHost();
+ return getConnection().getVirtualHost();
}
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -1585,11 +1551,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
*/
private void closeConnection(String reason) throws AMQException
{
- Lock receivedLock = _session.getReceivedLock();
+ Lock receivedLock = _connection.getReceivedLock();
receivedLock.lock();
try
{
- _session.close(AMQConstant.RESOURCE_ERROR, reason);
+ _connection.close(AMQConstant.RESOURCE_ERROR, reason);
}
finally
{
@@ -1597,7 +1563,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
}
- public void deadLetter(long deliveryTag) throws AMQException
+ public void deadLetter(long deliveryTag)
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);