diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java | 129 |
1 files changed, 78 insertions, 51 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 27fa654843..4087b1f4a0 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -115,7 +115,7 @@ import org.apache.qpid.transport.TransportException; public class AMQChannel implements AMQSessionModel<AMQChannel, AMQProtocolEngine>, AsyncAutoCommitTransaction.FutureRecorder, - ChannelMethodProcessor + ServerChannelMethodProcessor { public static final int DEFAULT_PREFETCH = 4096; @@ -376,27 +376,18 @@ public class AMQChannel } public void publishContentHeader(ContentHeaderBody contentHeaderBody) - throws AMQException { - if (_currentMessage == null) + if (_logger.isDebugEnabled()) { - throw new AMQException("Received content header without previously receiving a BasicPublish frame"); + _logger.debug("Content header received on channel " + _channelId); } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header received on channel " + _channelId); - } - _currentMessage.setContentHeaderBody(contentHeaderBody); + _currentMessage.setContentHeaderBody(contentHeaderBody); - deliverCurrentMessageIfComplete(); - } + deliverCurrentMessageIfComplete(); } private void deliverCurrentMessageIfComplete() - throws AMQException { // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) @@ -497,7 +488,7 @@ public class AMQChannel * @throws AMQConnectionException if the message is mandatory close-on-no-route * @see AMQProtocolEngine#isCloseWhenNoRoute() */ - private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException + private void handleUnroutableMessage(AMQMessage message) { boolean mandatory = message.isMandatory(); String description = currentMessageDescription(); @@ -512,26 +503,27 @@ public class AMQChannel if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute()) { - throw new AMQConnectionException( - AMQConstant.NO_ROUTE, - "No route for message " + currentMessageDescription(), - 0, 0, // default class and method ids - getConnection().getMethodRegistry(), - (Throwable) null); - } - - if (mandatory || message.isImmediate()) - { - _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message)); + _connection.closeConnection(AMQConstant.NO_ROUTE, + "No route for message " + currentMessageDescription(), _channelId); } else { - AMQShortString exchangeName = _currentMessage.getExchangeName(); - AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); + if (mandatory || message.isImmediate()) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, + "No Route for message " + + currentMessageDescription(), + message)); + } + else + { + AMQShortString exchangeName = _currentMessage.getExchangeName(); + AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); - getVirtualHost().getEventLogger().message( - ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), - routingKey == null ? null : routingKey.asString())); + getVirtualHost().getEventLogger().message( + ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), + routingKey == null ? null : routingKey.asString())); + } } } @@ -550,13 +542,8 @@ public class AMQChannel : _currentMessage.getMessagePublishInfo().getRoutingKey().toString()); } - public void publishContentBody(ContentBody contentBody) throws AMQException + public void publishContentBody(ContentBody contentBody) { - if (_currentMessage == null) - { - throw new AMQException("Received content body without previously receiving a Content Header"); - } - if (_logger.isDebugEnabled()) { _logger.debug(debugIdentity() + " content body received on channel " + _channelId); @@ -568,13 +555,6 @@ public class AMQChannel deliverCurrentMessageIfComplete(); } - catch (AMQException e) - { - // we want to make sure we don't keep a reference to the message in the - // event of an error - _currentMessage = null; - throw e; - } catch (RuntimeException e) { // we want to make sure we don't keep a reference to the message in the @@ -1277,14 +1257,10 @@ public class AMQChannel private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle) - throws AMQException { AMQMessage message = new AMQMessage(handle, _connection.getReference()); - final BasicContentHeaderProperties properties = - incomingMessage.getContentHeader().getProperties(); - return message; } @@ -1340,6 +1316,11 @@ public class AMQChannel return _subject; } + public boolean hasCurrentMessage() + { + return _currentMessage != null; + } + private class GetDeliveryMethod implements ClientDeliveryMethod { @@ -2242,7 +2223,10 @@ public class AMQChannel } @Override - public void receiveChannelClose() + public void receiveChannelClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) { sync(); _connection.closeChannel(this); @@ -2258,6 +2242,43 @@ public class AMQChannel } @Override + public void receiveMessageContent(final byte[] data) + { + + if(hasCurrentMessage()) + { + publishContentBody(new ContentBody(data)); + } + else + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame", + _channelId); + } + } + + @Override + public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) + { + if(hasCurrentMessage()) + { + publishContentHeader(new ContentHeaderBody(properties, bodySize)); + } + else + { + _connection.closeConnection(AMQConstant.COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame", + _channelId); + } + } + + @Override + public boolean ignoreAllButCloseOk() + { + return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId); + } + + @Override public void receiveChannelFlow(final boolean active) { sync(); @@ -2270,9 +2291,15 @@ public class AMQChannel } @Override + public void receiveChannelFlowOk(final boolean active) + { + // TODO - should we do anything here? + } + + @Override public void receiveExchangeBound(final AMQShortString exchangeName, - final AMQShortString queueName, - final AMQShortString routingKey) + final AMQShortString routingKey, + final AMQShortString queueName) { VirtualHostImpl virtualHost = _connection.getVirtualHost(); MethodRegistry methodRegistry = _connection.getMethodRegistry(); |