summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-03-22 13:14:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-03-22 13:14:42 +0000
commit3fb9be28593263e12623ce09084a230b59b81f4f (patch)
tree8de74dd781802819df0ff1ca56aaa94fa1b9b38e /java/broker/src
parentb9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff)
downloadqpid-python-3fb9be28593263e12623ce09084a230b59b81f4f.tar.gz
made a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java65
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java62
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java288
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java266
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java33
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java62
46 files changed, 923 insertions, 513 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 7ceb3a7eef..be2cee79ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -433,7 +433,10 @@ public class AMQChannel
}
- /** Called to resend all outstanding unacknowledged messages to this same channel. */
+ /** Called to resend all outstanding unacknowledged messages to this same channel.
+ * @param session the session
+ * @param requeue if true then requeue, else resend
+ * @throws org.apache.qpid.AMQException */
public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
@@ -752,7 +755,9 @@ public class AMQChannel
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
- message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage()));
+ session.getProtocolOutputConverter().writeReturn(message, _channelId,
+ bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 42fe8c5274..a48bc5df7f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -45,7 +45,7 @@ import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
@@ -59,7 +59,8 @@ import org.apache.qpid.url.URLSyntaxException;
* Main entry point for AMQPD.
*
*/
-public class Main implements ProtocolVersionList
+@SuppressWarnings({"AccessStaticViaInstance"})
+public class Main
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -143,12 +144,21 @@ public class Main implements ProtocolVersionList
else if (commandLine.hasOption("v"))
{
String ver = "Qpid 0.9.0.0";
- String protocol = "AMQP version(s) [major.minor]: ";
- for (int i=0; i<pv.length; i++)
+ StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
+
+ boolean first = true;
+ for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
{
- if (i > 0)
- protocol += ", ";
- protocol += pv[i][PROTOCOL_MAJOR] + "." + pv[i][PROTOCOL_MINOR];
+ if(first)
+ {
+ first = false;
+ }
+ else
+ {
+ protocol.append(", ");
+ }
+ protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
+
}
System.out.println(ver + " (" + protocol + ")");
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index fdf087fdea..99cc60011a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -209,7 +209,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
if(consumerTag != null)
{
- msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
index 348bfa5e68..bdabcbf5be 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -25,7 +25,8 @@ import java.util.HashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQMessage;
/**
@@ -63,8 +64,9 @@ public class PropertyExpression implements Expression
{
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- return _properties.getReplyTo();
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ AMQShortString replyTo = _properties.getReplyTo();
+ return replyTo == null ? null : replyTo.toString();
}
catch (AMQException e)
{
@@ -83,8 +85,9 @@ public class PropertyExpression implements Expression
{
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- return _properties.getType();
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ AMQShortString type = _properties.getType();
+ return type == null ? null : type.toString();
}
catch (AMQException e)
{
@@ -126,7 +129,7 @@ public class PropertyExpression implements Expression
{
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
return (int) _properties.getPriority();
}
catch (AMQException e)
@@ -147,8 +150,9 @@ public class PropertyExpression implements Expression
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- return _properties.getMessageId();
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ AMQShortString messageId = _properties.getMessageId();
+ return messageId == null ? null : messageId;
}
catch (AMQException e)
{
@@ -168,7 +172,7 @@ public class PropertyExpression implements Expression
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
return _properties.getTimestamp();
}
catch (AMQException e)
@@ -189,8 +193,9 @@ public class PropertyExpression implements Expression
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- return _properties.getCorrelationId();
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ AMQShortString correlationId = _properties.getCorrelationId();
+ return correlationId == null ? null : correlationId.toString();
}
catch (AMQException e)
{
@@ -210,7 +215,7 @@ public class PropertyExpression implements Expression
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
return _properties.getExpiration();
}
catch (AMQException e)
@@ -254,7 +259,7 @@ public class PropertyExpression implements Expression
else
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
if(_logger.isDebugEnabled())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
index f93b2b25e6..269b68ff6b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -61,6 +61,6 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
}
// this method throws an AMQException if the delivery tag is not known
- channel.acknowledgeMessage(body.deliveryTag, body.multiple);
+ channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 7d18043f5c..c73cc73d23 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -55,15 +55,15 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
throw body.getChannelNotFoundException(evt.getChannelId());
}
- channel.unsubscribeConsumer(protocolSession, body.consumerTag);
- if (!body.nowait)
+ channel.unsubscribeConsumer(protocolSession, body.getConsumerTag());
+ if (!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
(byte) 8, (byte) 0, // AMQP version (major, minor)
- body.consumerTag); // consumerTag
+ body.getConsumerTag()); // consumerTag
protocolSession.writeFrame(responseFrame);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index da61f2ffd5..e96f8e8cba 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -68,14 +68,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
if (queue == null)
{
- _log.info("No queue for '" + body.queue + "'");
- if (body.queue != null)
+ _log.info("No queue for '" + body.getQueue() + "'");
+ if (body.getQueue() != null)
{
- String msg = "No such queue, '" + body.queue + "'";
+ String msg = "No such queue, '" + body.getQueue() + "'";
throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
@@ -88,9 +88,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
try
{
- AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
- body.arguments, body.noLocal, body.exclusive);
- if (!body.nowait)
+ AMQShortString consumerTag = channel.subscribeToQueue(body.getConsumerTag(), queue, session, !body.getNoAck(),
+ body.getArguments(), body.getNoLocal(), body.getExclusive());
+ if (!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -110,9 +110,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
catch (ConsumerTagNotUniqueException e)
{
- AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
+ AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Non-unique consumer tag, '" + body.consumerTag + "'");
+ "Non-unique consumer tag, '" + body.getConsumerTag() + "'");
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index b88c2ebf3a..7c65d94ceb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -43,15 +43,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
if (queue == null)
{
- _log.info("No queue for '" + body.queue + "'");
- if(body.queue!=null)
+ _log.info("No queue for '" + body.getQueue() + "'");
+ if(body.getQueue()!=null)
{
throw body.getConnectionException(AMQConstant.NOT_FOUND,
- "No such queue, '" + body.queue + "'");
+ "No such queue, '" + body.getQueue() + "'");
}
else
{
@@ -61,7 +61,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
- if(!queue.performGet(session, channel, !body.noAck))
+ if(!queue.performGet(session, channel, !body.getNoAck()))
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 67ade0a744..2c1f99f7c4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
@@ -61,14 +62,15 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
_log.debug("Publish received on channel " + evt.getChannelId());
}
+ AMQShortString exchangeName = body.getExchange();
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
- if (body.exchange == null)
+ if (exchangeName == null)
{
- body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+ exchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
}
VirtualHost vHost = session.getVirtualHost();
- Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange);
+ Exchange e = vHost.getExchangeRegistry().getExchange(exchangeName);
// if the exchange does not exist we raise a channel exception
if (e == null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index 3cd6a87f64..f9afbaa954 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -47,8 +47,8 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
}
- channel.setPrefetchCount(evt.getMethod().prefetchCount);
- channel.setPrefetchSize(evt.getMethod().prefetchSize);
+ channel.setPrefetchCount(evt.getMethod().getPrefetchCount());
+ channel.setPrefetchSize(evt.getMethod().getPrefetchSize());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index bc11e4652c..601187d2cd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -54,7 +54,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
throw body.getChannelNotFoundException(evt.getChannelId());
}
- channel.resend(session, body.requeue);
+ channel.resend(session, body.getRequeue());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index ed13092ded..4a7e99a219 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -49,10 +49,10 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
AMQProtocolSession session = stateManager.getProtocolSession();
- _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue);
+ _logger.info("FIXME: Rejecting:" + evt.getMethod().getDeliveryTag() + ": Requeue:" + evt.getMethod().getRequeue());
int channelId = evt.getChannelId();
- UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag);
+ UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().getDeliveryTag());
_logger.info("Need to reject message:" + message);
// if (evt.getMethod().requeue)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 9a8fce7129..8bee8d3992 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -51,8 +51,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
AMQProtocolSession session = stateManager.getProtocolSession();
ChannelCloseBody body = evt.getMethod();
- _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
- " and method " + body.methodId);
+ _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.getClassId() +
+ " and method " + body.getMethodId());
int channelId = evt.getChannelId();
AMQChannel channel = session.getChannel(channelId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index bfa170cfc5..35e1dd68f5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -58,15 +58,15 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
throw body.getChannelNotFoundException(evt.getChannelId());
}
- channel.setSuspended(!body.active);
- _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
+ channel.setSuspended(!body.getActive());
+ _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.getActive());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
(byte)8, (byte)0, // AMQP version (major, minor)
- body.active); // active
+ body.getActive()); // active
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 03fc7a3926..50125c02be 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.amqp_8_0.ChannelOpenOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,10 +53,15 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
session.addChannel(channel);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- session.writeFrame(response);
+
+ ChannelOpenOkBody channelOpenOkBody = createChannelOpenOkBody();
+
+
+ session.writeFrame(new AMQFrame(evt.getChannelId(), channelOpenOkBody));
+ }
+
+ private ChannelOpenOkBody createChannelOpenOkBody()
+ {
+ return new ChannelOpenOkBodyImpl();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 21da03d226..6981fbc72c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -49,8 +49,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
{
AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionCloseBody body = evt.getMethod();
- _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + session);
+ _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
+ body.getReplyText() + " for " + session);
try
{
session.closeSession();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index a85af61327..80e4c29577 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -58,13 +58,13 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
//ignore leading '/'
String virtualHostName;
- if((body.virtualHost != null) && body.virtualHost.charAt(0) == '/')
+ if((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
{
- virtualHostName = new StringBuilder(body.virtualHost.subSequence(1,body.virtualHost.length())).toString();
+ virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1,body.getVirtualHost().length())).toString();
}
else
{
- virtualHostName = body.virtualHost == null ? null : String.valueOf(body.virtualHost);
+ virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
}
VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
@@ -90,7 +90,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
(byte)8, (byte)0, // AMQP version (major, minor)
- body.virtualHost);
+ body.getVirtualHost());
stateManager.changeState(AMQState.CONNECTION_OPEN);
session.writeFrame(response);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index 4ad6dcde71..9f7a1c1e7c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -30,6 +30,10 @@ import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ConnectionTuneBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -68,7 +72,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
throw new AMQException("No SASL context set up in session");
}
- AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
switch (authResult.status)
{
case ERROR:
@@ -76,44 +80,32 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
stateManager.changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- ConnectionCloseBody.getClazz((byte)8, (byte)0), // classId
- ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName()); // replyText
- session.writeFrame(close);
+
+ ConnectionCloseBody closeBody =
+ new ConnectionCloseBodyImpl(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName(), // replyText)
+ ConnectionSecureOkBodyImpl.CLASS_ID, // classId
+ ConnectionSecureOkBodyImpl.METHOD_ID); // methodId
+
+ session.writeFrame(new AMQFrame(0,closeBody));
disposeSaslServer(session);
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- // TODO: Check the value of channelMax here: This should be the max
- // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
- // not Integer.MAX_VALUE (which is signed 4 bytes).
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- Integer.MAX_VALUE, // channelMax
- ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
- HeartbeatConfig.getInstance().getDelay()); // heartbeat
- session.writeFrame(tune);
+
+ ConnectionTuneBody tuneBody =
+ new ConnectionTuneBodyImpl(ConnectionStartOkMethodHandler.getConfiguredMaxChannels(),
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
+ HeartbeatConfig.getInstance().getDelay());
+ session.writeFrame(new AMQFrame(0,tuneBody));
disposeSaslServer(session);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- authResult.challenge); // challenge
- session.writeFrame(challenge);
+ ConnectionSecureBody secureBody =
+ new ConnectionSecureBodyImpl(authResult.challenge);
+ session.writeFrame(new AMQFrame(0,secureBody));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 65b79cf8e7..b14c8a5c81 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -48,6 +48,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler();
private static final int DEFAULT_FRAME_SIZE = 65536;
+ private static final int DEFAULT_CHANNEL_MAX = 65536;
public static StateAwareMethodListener<ConnectionStartOkBody> getInstance()
{
@@ -62,23 +63,23 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
{
AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionStartOkBody body = evt.getMethod();
- _logger.info("SASL Mechanism selected: " + body.mechanism);
- _logger.info("Locale selected: " + body.locale);
+ _logger.info("SASL Mechanism selected: " + body.getMechanism());
+ _logger.info("Locale selected: " + body.getLocale());
AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
SaslServer ss = null;
try
{
- ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+ ss = authMgr.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN());
session.setSaslServer(ss);
- AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
//save clientProperties
if (session.getClientProperties() == null)
{
- session.setClientProperties(body.clientProperties);
+ session.setClientProperties(body.getClientProperties());
}
switch (authResult.status)
@@ -141,6 +142,15 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
_logger.info("Framesize set to " + framesize);
return framesize;
}
+
+
+ static int getConfiguredMaxChannels()
+ {
+ final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
+ final int maxChannels = config.getInt("advanced.maxchannels", DEFAULT_CHANNEL_MAX);
+ _logger.info("Max Channels set to " + maxChannels);
+ return maxChannels;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
index ab7695955c..8a6b518934 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
@@ -49,6 +49,6 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
_logger.debug(body);
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- session.initHeartbeats(body.heartbeat);
+ session.initHeartbeats(body.getHeartbeat());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 2b123bcb2d..131d047c28 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -22,6 +22,7 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeBoundBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.amqp_8_0.ExchangeBoundOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -74,9 +75,9 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
ExchangeBoundBody body = evt.getMethod();
- AMQShortString exchangeName = body.exchange;
- AMQShortString queueName = body.queue;
- AMQShortString routingKey = body.routingKey;
+ AMQShortString exchangeName = body.getExchange();
+ AMQShortString queueName = body.getQueue();
+ AMQShortString routingKey = body.getRoutingKey();
if (exchangeName == null)
{
throw new AMQException("Exchange exchange must not be null");
@@ -86,8 +87,8 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (exchange == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
EXCHANGE_NOT_FOUND, // replyCode
new AMQShortString("Exchange " + exchangeName + " not found")); // replyText
}
@@ -98,16 +99,16 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (exchange.hasBindings())
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
OK, // replyCode
null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
NO_BINDINGS, // replyCode
null); // replyText
}
@@ -119,8 +120,8 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
QUEUE_NOT_FOUND, // replyCode
new AMQShortString("Queue " + queueName + " not found")); // replyText
}
@@ -129,16 +130,16 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (exchange.isBound(queue))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
OK, // replyCode
null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
QUEUE_NOT_BOUND, // replyCode
new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText
}
@@ -151,52 +152,58 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
QUEUE_NOT_FOUND, // replyCode
new AMQShortString("Queue " + queueName + " not found")); // replyText
}
else
{
- if (exchange.isBound(body.routingKey, queue))
+ if (exchange.isBound(body.getRoutingKey(), queue))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
OK, // replyCode
null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
new AMQShortString("Queue " + queueName + " not bound with routing key " +
- body.routingKey + " to exchange " + exchangeName)); // replyText
+ body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
}
}
}
else
{
- if (exchange.isBound(body.routingKey))
+ if (exchange.isBound(body.getRoutingKey()))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
OK, // replyCode
null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
NO_QUEUE_BOUND_WITH_RK, // replyCode
- new AMQShortString("No queue bound with routing key " + body.routingKey +
+ new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
" to exchange " + exchangeName)); // replyText
}
}
session.writeFrame(response);
}
+
+ private AMQFrame createExchangeBoundResponseFrame(int channelId, int replyCode, AMQShortString replyText)
+ {
+ ExchangeBoundOkBody okBody = new ExchangeBoundOkBodyImpl(replyCode,replyText);
+ return new AMQFrame(channelId,okBody);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index be3ffcc698..00c279f133 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -64,43 +64,43 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
final ExchangeDeclareBody body = evt.getMethod();
if (_logger.isDebugEnabled())
{
- _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
+ _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
}
synchronized(exchangeRegistry)
{
- Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+ Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
if (exchange == null)
{
- if(body.passive && ((body.type == null) || body.type.length() ==0))
+ if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
}
else
{
try
{
- exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
- body.passive, body.ticket);
+ exchange = exchangeFactory.createExchange(body.getExchange(), body.getType(), body.getDurable(),
+ body.getPassive(), body.getTicket());
exchangeRegistry.registerExchange(exchange);
}
catch(AMQUnknownExchangeType e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
}
}
}
- else if (!exchange.getType().equals(body.type))
+ else if (!exchange.getType().equals(body.getType()))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
}
}
- if(!body.nowait)
+ if(!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index f9926c399c..ebae8279d9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -54,7 +54,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
ExchangeDeleteBody body = evt.getMethod();
try
{
- exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
+ exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 4dc67b1970..2451aa2fa7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
@@ -61,8 +62,10 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
final QueueBindBody body = evt.getMethod();
+
+ AMQShortString routingKey = body.getRoutingKey();
final AMQQueue queue;
- if (body.queue == null)
+ if (body.getQueue() == null)
{
AMQChannel channel = session.getChannel(evt.getChannelId());
@@ -77,33 +80,35 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
-
- if (body.routingKey == null)
+
+
+
+ if (routingKey == null)
{
- body.routingKey = queue.getName();
+ routingKey = queue.getName();
}
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
}
try
{
- queue.bind(body.routingKey, body.arguments, exch);
+ queue.bind(routingKey, body.getArguments(), exch);
}
catch (AMQInvalidRoutingKeyException rke)
{
- throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
}
catch (AMQException e)
{
@@ -112,9 +117,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if (_log.isInfoEnabled())
{
- _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
}
- if (!body.nowait)
+ if (!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 8b2467f47d..5a49368c1c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -78,10 +78,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
QueueDeclareBody body = evt.getMethod();
+ AMQShortString queueName = body.getQueue();
+
// if we aren't given a queue name, we create one which we return to the client
- if (body.queue == null)
+ if (body.getQueue() == null)
{
- body.queue = createName();
+ queueName = createName();
}
AMQQueue queue;
@@ -90,11 +92,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
synchronized (queueRegistry)
{
- if (((queue = queueRegistry.getQueue(body.queue)) == null) )
+ if (((queue = queueRegistry.getQueue(queueName)) == null) )
{
- if(body.passive)
+ if(body.getPassive())
{
- String msg = "Queue: " + body.queue + " not found.";
+ String msg = "Queue: " + queueName + " not found.";
throw body.getChannelException(AMQConstant.NOT_FOUND,msg );
}
else
@@ -109,8 +111,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
{
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
- queue.bind(body.queue, null, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default exchange");
+ queue.bind(queueName, null, defaultExchange);
+ _log.info("Queue " + body.getQueue() + " bound to default exchange");
}
}
}
@@ -130,7 +132,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
channel.setDefaultQueue(queue);
}
- if (!body.nowait)
+ if (!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -139,8 +141,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
(byte)8, (byte)0, // AMQP version (major, minor)
queue.getConsumerCount(), // consumerCount
queue.getMessageCount(), // messageCount
- body.queue); // queue
- _log.info("Queue " + body.queue + " declared successfully");
+ queueName); // queue
+ _log.info("Queue " + queueName + " declared successfully");
session.writeFrame(response);
}
}
@@ -159,11 +161,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
throws AMQException
{
final QueueRegistry registry = virtualHost.getQueueRegistry();
- AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+ final AMQQueue queue = new AMQQueue(body.getQueue(), body.getDurable(), owner, body.getAutoDelete(), virtualHost);
final AMQShortString queueName = queue.getName();
- if(body.exclusive && !body.durable)
+ if(body.getExclusive() && !body.getDurable())
{
final AMQProtocolSession.Task deleteQueueTask =
new AMQProtocolSession.Task()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 0c7de312a7..97675e8731 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -65,7 +65,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
- if(body.queue == null)
+ if(body.getQueue() == null)
{
AMQChannel channel = session.getChannel(evt.getChannelId());
@@ -79,31 +79,31 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
{
- if(body.ifEmpty && !queue.isEmpty())
+ if(body.getIfEmpty() && !queue.isEmpty())
{
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." );
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty." );
}
- else if(body.ifUnused && !queue.isUnused())
+ else if(body.getIfUnused() && !queue.isUnused())
{
// TODO - Error code
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." );
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used." );
}
else
{
- int purged = queue.delete(body.ifUnused, body.ifEmpty);
+ int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
store.removeQueue(queue.getName());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 0c00436470..e380bb3770 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -44,7 +44,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
- if(body.queue == null)
+ if(body.getQueue() == null)
{
if (channel == null)
@@ -65,14 +65,14 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
@@ -80,7 +80,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
long purged = queue.clearQueue(channel.getStoreContext());
- if(!body.nowait)
+ if(!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index a10f44f906..18b09c00a7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.amqp_8_0.TxRollbackBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -56,10 +58,10 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
}
channel.rollback();
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
+ TxRollbackBody rollbackBody = createTxRollbackBody();
+ session.writeFrame(new AMQFrame(evt.getChannelId(), rollbackBody));
+
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
// Why, are we not allowed to send messages back to client before the ok method?
@@ -70,4 +72,9 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
}
+
+ private TxRollbackBody createTxRollbackBody()
+ {
+ return new TxRollbackBodyImpl();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index a9e478e301..bd8f29526d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.amqp_8_0.TxSelectBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
@@ -55,9 +57,12 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
channel.setLocalTransactional();
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+ TxSelectBody txSelect = createTxSelectBody();
+ session.writeFrame(new AMQFrame(evt.getChannelId(), txSelect));
+ }
+
+ private TxSelectBody createTxSelectBody()
+ {
+ return new TxSelectBodyImpl();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
new file mode 100644
index 0000000000..e01c5aabbf
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.AMQException;
+
+public interface ProtocolOutputConverter
+{
+ void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
+
+ interface Factory
+ {
+ ProtocolOutputConverter newInstance(AMQProtocolSession session);
+ }
+
+ void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException;
+
+ void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+
+ byte getProtocolMinorVersion();
+
+ byte getProtocolMajorVersion();
+
+ void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException;
+
+ void writeFrame(AMQDataBlock block);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
new file mode 100644
index 0000000000..8366c426dd
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
+import org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public class ProtocolOutputConverterRegistry
+{
+
+ private static final Map<Byte, Map<Byte, Factory>> _registry =
+ new HashMap<Byte, Map<Byte, Factory>>();
+
+
+ static
+ {
+ register((byte) 8, (byte) 0, ProtocolOutputConverterImpl.getInstanceFactory());
+ }
+
+ private static void register(byte major, byte minor, Factory converter)
+ {
+ if(!_registry.containsKey(major))
+ {
+ _registry.put(major, new HashMap<Byte, Factory>());
+ }
+ _registry.get(major).put(minor, converter);
+ }
+
+
+ public static ProtocolOutputConverter getConverter(AMQProtocolSession session)
+ {
+ return _registry.get(session.getProtocolMajorVersion()).get(session.getProtocolMinorVersion()).newInstance(session);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
new file mode 100644
index 0000000000..bd5bb632fe
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -0,0 +1,288 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Iterator;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+ final long messageId = message.getMessageId();
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext,messageId, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+ final long messageId = message.getMessageId();
+
+ ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext, messageId, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ consumerTag,
+ deliveryTag, pb.getExchange(), messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ message.getMessagePublishInfo().getExchange(),
+ replyCode, replyText,
+ message.getMessagePublishInfo().getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+ ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+
+ writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ writeFrame(bodyFrameIterator.next());
+ }
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ writeFrame(BasicCancelOkBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ consumerTag // consumerTag
+ ));
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 2de32c2f0f..587cb3b2aa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -43,25 +43,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MainRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
-import org.apache.qpid.framing.VersionSpecificRegistry;
-import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.ConnectionStartBodyImpl;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -71,7 +61,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class AMQMinaProtocolSession implements AMQProtocolSession,
- ProtocolVersionList,
Managable
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -111,12 +100,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private long _maxNoOfChannels = 1000;
/* AMQP Version for this session */
- private byte _major = pv[pv.length - 1][PROTOCOL_MAJOR];
- private byte _minor = pv[pv.length - 1][PROTOCOL_MINOR];
+ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
+
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length - 1][PROTOCOL_MAJOR], pv[pv.length - 1][PROTOCOL_MINOR]);
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
private List<Integer> _closingChannelsList = new ArrayList<Integer>();
+ private ProtocolOutputConverter _protocolOutputConverter;
public ManagedObject getManagedObject()
@@ -195,86 +185,123 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_lastReceived = message;
if (message instanceof ProtocolInitiation)
{
- ProtocolInitiation pi = (ProtocolInitiation) message;
- // this ensures the codec never checks for a PI message again
- ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
- try
- {
- pi.checkVersion(this); // Fails if not correct
+ protocolInitiationReceived((ProtocolInitiation) message);
- // This sets the protocol version (and hence framing classes) for this session.
- setProtocolVersion(pi.protocolMajor, pi.protocolMinor);
+ }
+ else if (message instanceof AMQFrame)
+ {
+ AMQFrame frame = (AMQFrame) message;
+ frameReceived(frame);
- String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+ }
+ else
+ {
+ throw new UnknnownMessageTypeException(message);
+ }
+ }
- String locales = "en_US";
+ private void frameReceived(AMQFrame frame)
+ throws AMQException
+ {
+ int channelId = frame.getChannel();
+ AMQBody body = frame.getBodyFrame();
- // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- _major, _minor, // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short) _major, // versionMajor
- (short) _minor); // versionMinor
- _minaProtocolSession.write(response);
- }
- catch (AMQException e)
- {
- _logger.error("Received incorrect protocol initiation", e);
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be used
- here. */
- int i = pv.length - 1;
- _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
- // TODO: Close connection (but how to wait until message is sent?)
- // ritchiem 2006-12-04 will this not do?
-// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
-// future.join();
-// close connection
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame Received: " + frame);
+ }
- }
+ if (body instanceof AMQMethodBodyImpl)
+ {
+ methodFrameReceived(channelId, (AMQMethodBodyImpl)body);
+ }
+ else if (body instanceof ContentHeaderBody)
+ {
+ contentHeaderReceived(channelId, (ContentHeaderBody)body);
+ }
+ else if (body instanceof ContentBody)
+ {
+ contentBodyReceived(channelId, (ContentBody)body);
+ }
+ else if (body instanceof HeartbeatBody)
+ {
+ // NO OP
}
else
{
- AMQFrame frame = (AMQFrame) message;
-
- if (frame.getBodyFrame() instanceof AMQMethodBody)
- {
- methodFrameReceived(frame);
- }
- else
- {
- contentFrameReceived(frame);
- }
+ _logger.warn("Unrecognised frame " + frame.getClass().getName());
}
}
- private void methodFrameReceived(AMQFrame frame)
+ private void protocolInitiationReceived(ProtocolInitiation pi)
{
- if (_logger.isDebugEnabled())
+ // this ensures the codec never checks for a PI message again
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ try
+ {
+ pi.checkVersion(); // Fails if not correct
+
+ // This sets the protocol version (and hence framing classes) for this session.
+ setProtocolVersion(pi._protocolMajor, pi._protocolMinor);
+
+ String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+
+ String locales = "en_US";
+
+ ConnectionStartBody connectionStartBody = createConnectionStartBody(pi,
+ locales.getBytes(),
+ mechanisms.getBytes(),
+ null);
+
+ // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
+ AMQFrame response = new AMQFrame((short) 0,connectionStartBody); // versionMinor
+ _minaProtocolSession.write(response);
+ }
+ catch (AMQException e)
{
- _logger.debug("Method frame received: " + frame);
+ _logger.error("Received incorrect protocol initiation", e);
+
+ _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+
+ // TODO: Close connection (but how to wait until message is sent?)
+ // ritchiem 2006-12-04 will this not do?
+// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
+// future.join();
+// close connection
+
}
+ }
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
- (AMQMethodBody) frame.getBodyFrame());
+ private ConnectionStartBody createConnectionStartBody(ProtocolInitiation pi,
+ byte[] locales,
+ byte[] mechanisms,
+ FieldTable serverProperties)
+ {
+ return new ConnectionStartBodyImpl(pi._protocolMajor, pi._protocolMinor,serverProperties,mechanisms,locales);
+ }
+
+
+ private void methodFrameReceived(int channelId, AMQMethodBodyImpl methodBody)
+ {
+
+ final AMQMethodEvent<AMQMethodBodyImpl> evt = new AMQMethodEvent<AMQMethodBodyImpl>(channelId,
+ methodBody);
//Check that this channel is not closing
- if (channelAwaitingClosure(frame.getChannel()))
+ if (channelAwaitingClosure(channelId))
{
if ((evt.getMethod() instanceof ChannelCloseOkBody))
{
if (_logger.isInfoEnabled())
{
- _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok");
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
}
}
else
{
if (_logger.isInfoEnabled())
{
- _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring");
+ _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
}
return;
}
@@ -298,19 +325,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ throw new AMQNoMethodHandlerException(evt);
}
}
catch (AMQChannelException e)
{
- if (getChannel(frame.getChannel()) != null)
+ if (getChannel(channelId) != null)
{
if (_logger.isInfoEnabled())
{
_logger.info("Closing channel due to: " + e.getMessage());
}
- writeFrame(e.getCloseFrame(frame.getChannel()));
- closeChannel(frame.getChannel());
+ writeFrame(e.getCloseFrame(channelId));
+ closeChannel(channelId);
}
else
{
@@ -328,7 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQConstant.CHANNEL_ERROR.getName().toString());
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(ce.getCloseFrame(frame.getChannel()));
+ writeFrame(ce.getCloseFrame(channelId));
}
}
catch (AMQConnectionException e)
@@ -339,7 +366,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(frame.getChannel()));
+ writeFrame(e.getCloseFrame(channelId));
}
}
catch (Exception e)
@@ -353,61 +380,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- private void contentFrameReceived(AMQFrame frame) throws AMQException
- {
- if (frame.getBodyFrame() instanceof ContentHeaderBody)
- {
- contentHeaderReceived(frame);
- }
- else if (frame.getBodyFrame() instanceof ContentBody)
- {
- contentBodyReceived(frame);
- }
- else if (frame.getBodyFrame() instanceof HeartbeatBody)
- {
- _logger.debug("Received heartbeat from client");
- }
- else
- {
- _logger.warn("Unrecognised frame " + frame.getClass().getName());
- }
- }
- private void contentHeaderReceived(AMQFrame frame) throws AMQException
+ private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content header frame received: " + frame);
- }
- AMQChannel channel = getChannel(frame.getChannel());
+ AMQChannel channel = getAndAssertChannel(channelId);
+
+ channel.publishContentHeader(body);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
- }
- else
- {
- channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
- }
}
- private void contentBodyReceived(AMQFrame frame) throws AMQException
+ private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content body frame received: " + frame);
- }
- AMQChannel channel = getChannel(frame.getChannel());
+ AMQChannel channel = getAndAssertChannel(channelId);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
- }
- else
- {
- channel.publishContentBody((ContentBody) frame.getBodyFrame(), this);
- }
+ channel.publishContentBody(body, this);
}
/**
@@ -437,6 +424,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return new ArrayList<AMQChannel>(_channelMap.values());
}
+ public AMQChannel getAndAssertChannel(int channelId) throws AMQException
+ {
+ AMQChannel channel = getChannel(channelId);
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+ }
+ return channel;
+ }
+
public AMQChannel getChannel(int channelId) throws AMQException
{
if (channelAwaitingClosure(channelId))
@@ -685,24 +682,26 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private void setProtocolVersion(byte major, byte minor)
{
- _major = major;
- _minor = minor;
- _registry = MainRegistry.getVersionSpecificRegistry(major, minor);
+ _protocolVersion = new ProtocolVersion(major,minor);
+
+ _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
+
+ _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
}
public byte getProtocolMajorVersion()
{
- return _major;
+ return _protocolVersion.getMajorVersion();
}
public byte getProtocolMinorVersion()
{
- return _minor;
+ return _protocolVersion.getMinorVersion();
}
public boolean isProtocolVersion(byte major, byte minor)
{
- return _major == major && _minor == minor;
+ return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor;
}
public VersionSpecificRegistry getRegistry()
@@ -739,5 +738,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_taskList.remove(task);
}
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return _protocolOutputConverter;
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
new file mode 100644
index 0000000000..82f6e96906
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQMethodBodyImpl;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+
+public class AMQNoMethodHandlerException extends AMQException
+{
+
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBodyImpl> evt)
+ {
+ super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 9d397505dc..756a8b5ebe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -39,7 +39,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ConnectorConfiguration;
@@ -53,7 +53,7 @@ import org.apache.qpid.ssl.SSLContextFactory;
* the state for the connection.
*
*/
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList
+public class AMQPFastProtocolHandler extends IoHandlerAdapter
{
private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
@@ -162,12 +162,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
if (throwable instanceof AMQProtocolHeaderException)
{
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be returned
- here. */
- int i = pv.length - 1;
- protocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+
+ protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+
protocolSession.close();
+
_logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable);
}
else if(throwable instanceof IOException)
@@ -176,8 +175,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
}
else
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
session.getProtocolMajorVersion(),
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 503dc8b554..4cfee06850 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -162,4 +163,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
void removeSessionCloseTask(Task task);
+ public ProtocolOutputConverter getProtocolOutputConverter();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index ea89136a62..50cbc35266 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -24,6 +24,7 @@ import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
+import javax.management.NotCompliantMBeanException;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -39,6 +40,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.AMQManagedObject;
@@ -65,7 +67,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
new AMQShortString("Broker Management Console has closed the connection.");
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException
+ public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
{
super(ManagedConnection.class, ManagedConnection.TYPE);
_session = session;
@@ -74,6 +76,8 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
_name = jmxEncode(new StringBuffer(remote), 0).toString();
init();
}
+
+
static
{
try
@@ -94,7 +98,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
{
_channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
- _channelAtttibuteNames, _channelAttributeTypes);
+ _channelAtttibuteNames, _channelAttributeTypes);
_channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
}
@@ -215,18 +219,11 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
*/
public void closeConnection() throws JMException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
- _session.getProtocolMajorVersion(),
- _session.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ ConnectionCloseBody connectionCloseBody =
+ createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
);
- _session.writeFrame(response);
+ _session.writeFrame(new AMQFrame(0,connectionCloseBody));
try
{
@@ -238,6 +235,11 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
}
}
+ private ConnectionCloseBody createConnectionCloseBody(int code, AMQShortString replyText)
+ {
+ return new ConnectionCloseBodyImpl(code,replyText,0,0);
+ }
+
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
new file mode 100644
index 0000000000..45d09e8f3e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.AMQException;
+
+public class UnknnownMessageTypeException extends AMQException
+{
+ public UnknnownMessageTypeException(AMQDataBlock message)
+ {
+ super("Unknown message type: " + message.getClass().getName() + ": " + message);
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index aa7ea16afc..32873c3cf5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,23 +20,34 @@
*/
package org.apache.qpid.server.queue;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBodyImpl;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.txn.TransactionalContext;
/** Combines the information that make up a deliverable message into a more manageable form. */
+
+import org.apache.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Combines the information that make up a deliverable message into a more manageable form.
+ */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -114,7 +125,7 @@ public class AMQMessage
try
{
- AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+ AMQBodyImpl cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
return new AMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -136,7 +147,7 @@ public class AMQMessage
}
}
- private StoreContext getStoreContext()
+ public StoreContext getStoreContext()
{
return _txnContext.getStoreContext();
}
@@ -579,6 +590,7 @@ public class AMQMessage
}
}
+/*
public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
@@ -746,6 +758,12 @@ public class AMQMessage
protocolSession.writeFrame(bodyFrameIterator.next());
}
}
+*/
+
+ public AMQMessageHandle getMessageHandle()
+ {
+ return _messageHandle;
+ }
public long getSize()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 4fd89f39da..c9329a244c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -41,9 +41,9 @@ import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -344,12 +344,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
try
{
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+ CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
String mimeType = null, encoding = null;
if (headerProperties != null)
{
- mimeType = headerProperties.getContentType();
- encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+ mimeType = mimeTypeShortSting == null ? null : mimeTypeShortSting.toString();
+ encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding().toString();
}
Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
@@ -382,7 +383,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
AMQMessage msg = list.get(i - 1);
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) headerBody.properties;
String[] headerAttributes = headerProperties.toString().split(",");
Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 208a59516c..e70926736d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -270,7 +270,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
}
- msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+ protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(),
+ deliveryTag, _queue.getMessageCount());
_totalMessageSize.addAndGet(-msg.getSize());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index ede7731a06..0a2e73880c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -29,9 +29,9 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -258,7 +258,7 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
}
}
@@ -294,7 +294,7 @@ public class SubscriptionImpl implements Subscription
msg.decrementReference(storeContext);
}
- msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
}
}
@@ -466,13 +466,9 @@ public class SubscriptionImpl implements Subscription
if (_autoClose && !_sentClose)
{
_logger.info("Closing autoclose subscription:" + this);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- consumerTag // consumerTag
- ));
+ ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
+
_sentClose = true;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index d12f5cd084..73401ee664 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -27,35 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.handler.BasicAckMethodHandler;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
index e3af0bc486..2f28a3125d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
deleted file mode 100644
index 90aa7bb998..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-
-import org.apache.mina.common.ByteBuffer;
-
-public class ContentChunkAdapter
-{
- public static ContentBody toConentBody(ContentChunk contentBodyChunk)
- {
- return new ContentBody(contentBodyChunk.getData());
- }
-
- public static ContentChunk toConentChunk(final ContentBody contentBodyChunk)
- {
- return new ContentChunk() {
-
- public int getSize()
- {
- return contentBodyChunk.getSize();
- }
-
- public ByteBuffer getData()
- {
- return contentBodyChunk.payload;
- }
-
- public void reduceToFit()
- {
- contentBodyChunk.reduceBufferToFit();
- }
- };
-
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
deleted file mode 100644
index 6ee2fa784d..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-
-public class MessagePublishInfoAdapter
-{
- private final byte _majorVersion;
- private final byte _minorVersion;
- private final int _classId;
- private final int _methodId;
-
-
- public MessagePublishInfoAdapter(byte majorVersion, byte minorVersion)
- {
- _majorVersion = majorVersion;
- _minorVersion = minorVersion;
- _classId = BasicPublishBody.getClazz(majorVersion,minorVersion);
- _methodId = BasicPublishBody.getMethod(majorVersion,minorVersion);
- }
-
- public BasicPublishBody toMethodBody(MessagePublishInfo pubInfo)
- {
- return new BasicPublishBody(_majorVersion,
- _minorVersion,
- _classId,
- _methodId,
- pubInfo.getExchange(),
- pubInfo.isImmediate(),
- pubInfo.isMandatory(),
- pubInfo.getRoutingKey(),
- 0) ; // ticket
- }
-
- public MessagePublishInfo toMessagePublishInfo(final BasicPublishBody body)
- {
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return body.getExchange();
- }
-
- public boolean isImmediate()
- {
- return body.getImmediate();
- }
-
- public boolean isMandatory()
- {
- return body.getMandatory();
- }
-
- public AMQShortString getRoutingKey()
- {
- return body.getRoutingKey();
- }
- };
- }
-}