diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java | 146 |
1 files changed, 56 insertions, 90 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 7574cc3533..b4f276a45a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -105,8 +105,8 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.TransportException; -public class AMQChannel<T extends AMQProtocolSession<T>> - implements AMQSessionModel<AMQChannel<T>,T>, +public class AMQChannel + implements AMQSessionModel<AMQChannel, AMQProtocolEngine>, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -158,7 +158,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final T _session; + private final AMQProtocolEngine _connection; private AtomicBoolean _closing = new AtomicBoolean(false); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -180,8 +180,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); - private final List<Action<? super AMQChannel<T>>> _taskList = - new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>(); + private final List<Action<? super AMQChannel>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQChannel>>(); private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); @@ -191,17 +191,18 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; + private ChannelMethodProcessor _channelMethodProcessor; - public AMQChannel(T session, int channelId, final MessageStore messageStore) + public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) throws AMQException { - _session = session; + _connection = connection; _channelId = channelId; - _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(), - session.getAuthorizedSubject().getPublicCredentials(), - session.getAuthorizedSubject().getPrivateCredentials()); + _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(), + connection.getAuthorizedSubject().getPublicCredentials(), + connection.getAuthorizedSubject().getPrivateCredentials()); _subject.getPrincipals().add(new SessionPrincipal(this)); _logSubject = new ChannelLogSubject(this); @@ -210,7 +211,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // by default the session is non-transactional _transaction = new AsyncAutoCommitTransaction(_messageStore, this); - _clientDeliveryMethod = session.createDeliveryMethod(_channelId); + _clientDeliveryMethod = connection.createDeliveryMethod(_channelId); _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() { @@ -238,6 +239,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> return null; } }); + _channelMethodProcessor = new ChannelMethodProcessorImpl(this); } @@ -249,7 +251,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> @Override public long getActivityTime() { - return _session.getLastReceivedTime(); + return _connection.getLastReceivedTime(); } }); _txnStarts.incrementAndGet(); @@ -354,7 +356,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> final MessageMetaData messageMetaData = new MessageMetaData(_currentMessage.getMessagePublishInfo(), _currentMessage.getContentHeader(), - getProtocolSession().getLastReceivedTime()); + getConnection().getLastReceivedTime()); final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle); @@ -429,7 +431,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { long bodySize = _currentMessage.getSize(); long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp(); - _session.registerMessageReceived(bodySize, timestamp); + _connection.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } } @@ -442,13 +444,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * Pre-requisite: the current message is judged to have no destination queues. * * @throws AMQConnectionException if the message is mandatory close-on-no-route - * @see AMQProtocolSession#isCloseWhenNoRoute() + * @see AMQProtocolEngine#isCloseWhenNoRoute() */ private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException { boolean mandatory = message.isMandatory(); String description = currentMessageDescription(); - boolean closeOnNoRoute = _session.isCloseWhenNoRoute(); + boolean closeOnNoRoute = _connection.isCloseWhenNoRoute(); if(_logger.isDebugEnabled()) { @@ -457,13 +459,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> description, mandatory, isTransactional(), closeOnNoRoute)); } - if (mandatory && isTransactional() && _session.isCloseWhenNoRoute()) + if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute()) { throw new AMQConnectionException( AMQConstant.NO_ROUTE, "No route for message " + currentMessageDescription(), 0, 0, // default class and method ids - getProtocolSession().getMethodRegistry(), + getConnection().getMethodRegistry(), (Throwable) null); } @@ -564,9 +566,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>> */ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) - throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, - MessageSource.ConsumerAccessRefused + throws MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer, + AMQInvalidArgumentException, + MessageSource.ConsumerAccessRefused, ConsumerTagInUseException { if (tag == null) { @@ -575,7 +578,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if (_tag2SubscriptionTargetMap.containsKey(tag)) { - throw new AMQException("Consumer already exists with same tag: " + tag); + throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag); } ConsumerTarget_0_8 target; @@ -647,27 +650,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } } - catch (AccessControlException e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ExistingExclusiveConsumer e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ExistingConsumerPreventsExclusive e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (AMQInvalidArgumentException e) - { - _tag2SubscriptionTargetMap.remove(tag); - throw e; - } - catch (MessageSource.ConsumerAccessRefused e) + catch (AccessControlException + | MessageSource.ExistingExclusiveConsumer + | MessageSource.ExistingConsumerPreventsExclusive + | AMQInvalidArgumentException + | MessageSource.ConsumerAccessRefused e) { _tag2SubscriptionTargetMap.remove(tag); throw e; @@ -728,7 +715,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> unsubscribeAllConsumers(); - for (Action<? super AMQChannel<T>> task : _taskList) + for (Action<? super AMQChannel> task : _taskList) { task.performAction(this); } @@ -895,9 +882,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> /** * Called to resend all outstanding unacknowledged messages to this same channel. * - * @throws AMQException When something goes wrong. */ - public void resend() throws AMQException + public void resend() { @@ -983,9 +969,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only * acknowledges the single message specified by the delivery tag * - * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException + public void acknowledgeMessage(long deliveryTag, boolean multiple) { Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); @@ -1082,22 +1067,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public boolean isSuspended() { - return _suspended.get() || _closing.get() || _session.isClosing(); - } - - public void commit() throws AMQException - { - commit(null, false); + return _suspended.get() || _closing.get() || _connection.isClosing(); } - public void commit(final Runnable immediateAction, boolean async) throws AMQException + public void commit(final Runnable immediateAction, boolean async) { - if (!isTransactional()) - { - throw new AMQException("Fatal error: commit called on non-transactional channel"); - } if(async && _transaction instanceof LocalTransaction) { @@ -1130,17 +1106,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } - public void rollback() throws AMQException - { - rollback(NULL_TASK); - } - - public void rollback(Runnable postRollbackTask) throws AMQException + public void rollback(Runnable postRollbackTask) { - if (!isTransactional()) - { - throw new AMQException("Fatal error: commit called on non-transactional channel"); - } // stop all subscriptions _rollingBack = true; @@ -1198,7 +1165,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public String toString() { - return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]"; + return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]"; } public void setDefaultQueue(AMQQueue queue) @@ -1217,9 +1184,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>> return _closing.get(); } - public AMQProtocolSession getProtocolSession() + public AMQProtocolEngine getConnection() { - return _session; + return _connection; } public FlowCreditManager getCreditManager() @@ -1262,7 +1229,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> throws AMQException { - AMQMessage message = new AMQMessage(handle, _session.getReference()); + AMQMessage message = new AMQMessage(handle, _connection.getReference()); final BasicContentHeaderProperties properties = incomingMessage.getContentHeader().getProperties(); @@ -1273,7 +1240,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private boolean checkMessageUserId(ContentHeaderBody header) { AMQShortString userID = header.getProperties().getUserId(); - return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); + return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); } @@ -1284,14 +1251,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } @Override - public T getConnectionModel() + public AMQProtocolEngine getConnectionModel() { - return _session; + return _connection; } public String getClientID() { - return String.valueOf(_session.getContextKey()); + return String.valueOf(_connection.getContextKey()); } public LogSubject getLogSubject() @@ -1306,13 +1273,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } @Override - public void addDeleteTask(final Action<? super AMQChannel<T>> task) + public void addDeleteTask(final Action<? super AMQChannel> task) { _taskList.add(task); } @Override - public void removeDeleteTask(final Action<? super AMQChannel<T>> task) + public void removeDeleteTask(final Action<? super AMQChannel> task) { _taskList.remove(task); } @@ -1324,8 +1291,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public ChannelMethodProcessor getMethodProcessor() { - // TODO - return null; + return _channelMethodProcessor; } @@ -1356,7 +1322,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public void postCommit() { final ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); + _connection.getProtocolOutputConverter(); outputConverter.writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), @@ -1479,7 +1445,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public void postCommit() { AMQMessage message = _reference.getMessage(); - _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), + _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), message, _channelId, @@ -1548,7 +1514,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> @Override public Object getConnectionReference() { - return getProtocolSession().getReference(); + return getConnection().getReference(); } public int getUnacknowledgedMessageCount() @@ -1558,9 +1524,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private void flow(boolean flow) { - MethodRegistry methodRegistry = _session.getMethodRegistry(); + MethodRegistry methodRegistry = _connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); - _session.writeFrame(responseBody.generateFrame(_channelId)); + _connection.writeFrame(responseBody.generateFrame(_channelId)); } @Override @@ -1571,7 +1537,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public VirtualHostImpl getVirtualHost() { - return getProtocolSession().getVirtualHost(); + return getConnection().getVirtualHost(); } public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) @@ -1585,11 +1551,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> */ private void closeConnection(String reason) throws AMQException { - Lock receivedLock = _session.getReceivedLock(); + Lock receivedLock = _connection.getReceivedLock(); receivedLock.lock(); try { - _session.close(AMQConstant.RESOURCE_ERROR, reason); + _connection.close(AMQConstant.RESOURCE_ERROR, reason); } finally { @@ -1597,7 +1563,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } - public void deadLetter(long deliveryTag) throws AMQException + public void deadLetter(long deliveryTag) { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag); |