summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java129
1 files changed, 78 insertions, 51 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 27fa654843..4087b1f4a0 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -115,7 +115,7 @@ import org.apache.qpid.transport.TransportException;
public class AMQChannel
implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
AsyncAutoCommitTransaction.FutureRecorder,
- ChannelMethodProcessor
+ ServerChannelMethodProcessor
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -376,27 +376,18 @@ public class AMQChannel
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
- throws AMQException
{
- if (_currentMessage == null)
+ if (_logger.isDebugEnabled())
{
- throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+ _logger.debug("Content header received on channel " + _channelId);
}
- else
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content header received on channel " + _channelId);
- }
- _currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setContentHeaderBody(contentHeaderBody);
- deliverCurrentMessageIfComplete();
- }
+ deliverCurrentMessageIfComplete();
}
private void deliverCurrentMessageIfComplete()
- throws AMQException
{
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
@@ -497,7 +488,7 @@ public class AMQChannel
* @throws AMQConnectionException if the message is mandatory close-on-no-route
* @see AMQProtocolEngine#isCloseWhenNoRoute()
*/
- private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
+ private void handleUnroutableMessage(AMQMessage message)
{
boolean mandatory = message.isMandatory();
String description = currentMessageDescription();
@@ -512,26 +503,27 @@ public class AMQChannel
if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
{
- throw new AMQConnectionException(
- AMQConstant.NO_ROUTE,
- "No route for message " + currentMessageDescription(),
- 0, 0, // default class and method ids
- getConnection().getMethodRegistry(),
- (Throwable) null);
- }
-
- if (mandatory || message.isImmediate())
- {
- _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message));
+ _connection.closeConnection(AMQConstant.NO_ROUTE,
+ "No route for message " + currentMessageDescription(), _channelId);
}
else
{
- AMQShortString exchangeName = _currentMessage.getExchangeName();
- AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
+ if (mandatory || message.isImmediate())
+ {
+ _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
+ "No Route for message "
+ + currentMessageDescription(),
+ message));
+ }
+ else
+ {
+ AMQShortString exchangeName = _currentMessage.getExchangeName();
+ AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
- getVirtualHost().getEventLogger().message(
- ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
- routingKey == null ? null : routingKey.asString()));
+ getVirtualHost().getEventLogger().message(
+ ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
+ routingKey == null ? null : routingKey.asString()));
+ }
}
}
@@ -550,13 +542,8 @@ public class AMQChannel
: _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
}
- public void publishContentBody(ContentBody contentBody) throws AMQException
+ public void publishContentBody(ContentBody contentBody)
{
- if (_currentMessage == null)
- {
- throw new AMQException("Received content body without previously receiving a Content Header");
- }
-
if (_logger.isDebugEnabled())
{
_logger.debug(debugIdentity() + " content body received on channel " + _channelId);
@@ -568,13 +555,6 @@ public class AMQChannel
deliverCurrentMessageIfComplete();
}
- catch (AMQException e)
- {
- // we want to make sure we don't keep a reference to the message in the
- // event of an error
- _currentMessage = null;
- throw e;
- }
catch (RuntimeException e)
{
// we want to make sure we don't keep a reference to the message in the
@@ -1277,14 +1257,10 @@ public class AMQChannel
private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
- throws AMQException
{
AMQMessage message = new AMQMessage(handle, _connection.getReference());
- final BasicContentHeaderProperties properties =
- incomingMessage.getContentHeader().getProperties();
-
return message;
}
@@ -1340,6 +1316,11 @@ public class AMQChannel
return _subject;
}
+ public boolean hasCurrentMessage()
+ {
+ return _currentMessage != null;
+ }
+
private class GetDeliveryMethod implements ClientDeliveryMethod
{
@@ -2242,7 +2223,10 @@ public class AMQChannel
}
@Override
- public void receiveChannelClose()
+ public void receiveChannelClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
{
sync();
_connection.closeChannel(this);
@@ -2258,6 +2242,43 @@ public class AMQChannel
}
@Override
+ public void receiveMessageContent(final byte[] data)
+ {
+
+ if(hasCurrentMessage())
+ {
+ publishContentBody(new ContentBody(data));
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "Attempt to send a content header without first sending a publish frame",
+ _channelId);
+ }
+ }
+
+ @Override
+ public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+ {
+ if(hasCurrentMessage())
+ {
+ publishContentHeader(new ContentHeaderBody(properties, bodySize));
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "Attempt to send a content header without first sending a publish frame",
+ _channelId);
+ }
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk()
+ {
+ return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId);
+ }
+
+ @Override
public void receiveChannelFlow(final boolean active)
{
sync();
@@ -2270,9 +2291,15 @@ public class AMQChannel
}
@Override
+ public void receiveChannelFlowOk(final boolean active)
+ {
+ // TODO - should we do anything here?
+ }
+
+ @Override
public void receiveExchangeBound(final AMQShortString exchangeName,
- final AMQShortString queueName,
- final AMQShortString routingKey)
+ final AMQShortString routingKey,
+ final AMQShortString queueName)
{
VirtualHostImpl virtualHost = _connection.getVirtualHost();
MethodRegistry methodRegistry = _connection.getMethodRegistry();