diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-03-22 13:14:42 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-03-22 13:14:42 +0000 |
commit | 3fb9be28593263e12623ce09084a230b59b81f4f (patch) | |
tree | 8de74dd781802819df0ff1ca56aaa94fa1b9b38e /java/broker/src | |
parent | b9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff) | |
download | qpid-python-3fb9be28593263e12623ce09084a230b59b81f4f.tar.gz |
made a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
46 files changed, 923 insertions, 513 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 7ceb3a7eef..be2cee79ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -433,7 +433,10 @@ public class AMQChannel } - /** Called to resend all outstanding unacknowledged messages to this same channel. */ + /** Called to resend all outstanding unacknowledged messages to this same channel. + * @param session the session + * @param requeue if true then requeue, else resend + * @throws org.apache.qpid.AMQException */ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException { final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); @@ -752,7 +755,9 @@ public class AMQChannel for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage())); + session.getProtocolOutputConverter().writeReturn(message, _channelId, + bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 42fe8c5274..a48bc5df7f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -45,7 +45,7 @@ import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; @@ -59,7 +59,8 @@ import org.apache.qpid.url.URLSyntaxException; * Main entry point for AMQPD. * */ -public class Main implements ProtocolVersionList +@SuppressWarnings({"AccessStaticViaInstance"}) +public class Main { private static final Logger _logger = Logger.getLogger(Main.class); @@ -143,12 +144,21 @@ public class Main implements ProtocolVersionList else if (commandLine.hasOption("v")) { String ver = "Qpid 0.9.0.0"; - String protocol = "AMQP version(s) [major.minor]: "; - for (int i=0; i<pv.length; i++) + StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: "); + + boolean first = true; + for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) { - if (i > 0) - protocol += ", "; - protocol += pv[i][PROTOCOL_MAJOR] + "." + pv[i][PROTOCOL_MINOR]; + if(first) + { + first = false; + } + else + { + protocol.append(", "); + } + protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); + } System.out.println(ver + " (" + protocol + ")"); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index fdf087fdea..99cc60011a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -209,7 +209,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if(consumerTag != null) { - msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index 348bfa5e68..bdabcbf5be 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -25,7 +25,8 @@ import java.util.HashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.CommonContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQMessage; /** @@ -63,8 +64,9 @@ public class PropertyExpression implements Expression { try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - return _properties.getReplyTo(); + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + AMQShortString replyTo = _properties.getReplyTo(); + return replyTo == null ? null : replyTo.toString(); } catch (AMQException e) { @@ -83,8 +85,9 @@ public class PropertyExpression implements Expression { try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - return _properties.getType(); + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + AMQShortString type = _properties.getType(); + return type == null ? null : type.toString(); } catch (AMQException e) { @@ -126,7 +129,7 @@ public class PropertyExpression implements Expression { try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; return (int) _properties.getPriority(); } catch (AMQException e) @@ -147,8 +150,9 @@ public class PropertyExpression implements Expression try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - return _properties.getMessageId(); + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + AMQShortString messageId = _properties.getMessageId(); + return messageId == null ? null : messageId; } catch (AMQException e) { @@ -168,7 +172,7 @@ public class PropertyExpression implements Expression try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; return _properties.getTimestamp(); } catch (AMQException e) @@ -189,8 +193,9 @@ public class PropertyExpression implements Expression try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - return _properties.getCorrelationId(); + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + AMQShortString correlationId = _properties.getCorrelationId(); + return correlationId == null ? null : correlationId.toString(); } catch (AMQException e) { @@ -210,7 +215,7 @@ public class PropertyExpression implements Expression try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; return _properties.getExpiration(); } catch (AMQException e) @@ -254,7 +259,7 @@ public class PropertyExpression implements Expression else { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; if(_logger.isDebugEnabled()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index f93b2b25e6..269b68ff6b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -61,6 +61,6 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB } // this method throws an AMQException if the delivery tag is not known - channel.acknowledgeMessage(body.deliveryTag, body.multiple); + channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 7d18043f5c..c73cc73d23 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -55,15 +55,15 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC throw body.getChannelNotFoundException(evt.getChannelId()); } - channel.unsubscribeConsumer(protocolSession, body.consumerTag); - if (!body.nowait) + channel.unsubscribeConsumer(protocolSession, body.getConsumerTag()); + if (!body.getNowait()) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0, // AMQP version (major, minor) - body.consumerTag); // consumerTag + body.getConsumerTag()); // consumerTag protocolSession.writeFrame(responseFrame); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index da61f2ffd5..e96f8e8cba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -68,14 +68,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic else { - AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue); + AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); if (queue == null) { - _log.info("No queue for '" + body.queue + "'"); - if (body.queue != null) + _log.info("No queue for '" + body.getQueue() + "'"); + if (body.getQueue() != null) { - String msg = "No such queue, '" + body.queue + "'"; + String msg = "No such queue, '" + body.getQueue() + "'"; throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } else @@ -88,9 +88,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { try { - AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, - body.arguments, body.noLocal, body.exclusive); - if (!body.nowait) + AMQShortString consumerTag = channel.subscribeToQueue(body.getConsumerTag(), queue, session, !body.getNoAck(), + body.getArguments(), body.getNoLocal(), body.getExclusive()); + if (!body.getNowait()) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -110,9 +110,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } catch (ConsumerTagNotUniqueException e) { - AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); + AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Non-unique consumer tag, '" + body.consumerTag + "'"); + "Non-unique consumer tag, '" + body.getConsumerTag() + "'"); } catch (AMQQueue.ExistingExclusiveSubscription e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index b88c2ebf3a..7c65d94ceb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -43,15 +43,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB }
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
if (queue == null)
{
- _log.info("No queue for '" + body.queue + "'");
- if(body.queue!=null)
+ _log.info("No queue for '" + body.getQueue() + "'");
+ if(body.getQueue()!=null)
{
throw body.getConnectionException(AMQConstant.NOT_FOUND,
- "No such queue, '" + body.queue + "'");
+ "No such queue, '" + body.getQueue() + "'");
}
else
{
@@ -61,7 +61,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB }
else
{
- if(!queue.performGet(session, channel, !body.noAck))
+ if(!queue.performGet(session, channel, !body.getNoAck()))
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 67ade0a744..2c1f99f7c4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; @@ -61,14 +62,15 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi _log.debug("Publish received on channel " + evt.getChannelId()); } + AMQShortString exchangeName = body.getExchange(); // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? - if (body.exchange == null) + if (exchangeName == null) { - body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME; + exchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME; } VirtualHost vHost = session.getVirtualHost(); - Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange); + Exchange e = vHost.getExchangeRegistry().getExchange(exchangeName); // if the exchange does not exist we raise a channel exception if (e == null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 3cd6a87f64..f9afbaa954 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -47,8 +47,8 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); } - channel.setPrefetchCount(evt.getMethod().prefetchCount); - channel.setPrefetchSize(evt.getMethod().prefetchSize); + channel.setPrefetchCount(evt.getMethod().getPrefetchCount()); + channel.setPrefetchSize(evt.getMethod().getPrefetchSize()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index bc11e4652c..601187d2cd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -54,7 +54,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic throw body.getChannelNotFoundException(evt.getChannelId()); } - channel.resend(session, body.requeue); + channel.resend(session, body.getRequeue()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index ed13092ded..4a7e99a219 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -49,10 +49,10 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { AMQProtocolSession session = stateManager.getProtocolSession(); - _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue); + _logger.info("FIXME: Rejecting:" + evt.getMethod().getDeliveryTag() + ": Requeue:" + evt.getMethod().getRequeue()); int channelId = evt.getChannelId(); - UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag); + UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().getDeliveryTag()); _logger.info("Need to reject message:" + message); // if (evt.getMethod().requeue) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 9a8fce7129..8bee8d3992 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -51,8 +51,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { AMQProtocolSession session = stateManager.getProtocolSession(); ChannelCloseBody body = evt.getMethod(); - _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + - " and method " + body.methodId); + _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.getClassId() + + " and method " + body.getMethodId()); int channelId = evt.getChannelId(); AMQChannel channel = session.getChannel(channelId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index bfa170cfc5..35e1dd68f5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -58,15 +58,15 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB throw body.getChannelNotFoundException(evt.getChannelId()); } - channel.setSuspended(!body.active); - _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active); + channel.setSuspended(!body.getActive()); + _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.getActive()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0, // AMQP version (major, minor) - body.active); // active + body.getActive()); // active session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 03fc7a3926..50125c02be 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.amqp_8_0.ChannelOpenOkBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -52,10 +53,15 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(), virtualHost.getExchangeRegistry()); session.addChannel(channel); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); - session.writeFrame(response); + + ChannelOpenOkBody channelOpenOkBody = createChannelOpenOkBody(); + + + session.writeFrame(new AMQFrame(evt.getChannelId(), channelOpenOkBody)); + } + + private ChannelOpenOkBody createChannelOpenOkBody() + { + return new ChannelOpenOkBodyImpl(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 21da03d226..6981fbc72c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -49,8 +49,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C { AMQProtocolSession session = stateManager.getProtocolSession(); final ConnectionCloseBody body = evt.getMethod(); - _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + - body.replyText + " for " + session); + _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + + body.getReplyText() + " for " + session); try { session.closeSession(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index a85af61327..80e4c29577 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -58,13 +58,13 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con //ignore leading '/' String virtualHostName; - if((body.virtualHost != null) && body.virtualHost.charAt(0) == '/') + if((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/') { - virtualHostName = new StringBuilder(body.virtualHost.subSequence(1,body.virtualHost.length())).toString(); + virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1,body.getVirtualHost().length())).toString(); } else { - virtualHostName = body.virtualHost == null ? null : String.valueOf(body.virtualHost); + virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost()); } VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName); @@ -90,7 +90,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con // Be aware of possible changes to parameter order as versions change. AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, (byte)8, (byte)0, // AMQP version (major, minor) - body.virtualHost); + body.getVirtualHost()); stateManager.changeState(AMQState.CONNECTION_OPEN); session.writeFrame(response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 4ad6dcde71..9f7a1c1e7c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -30,6 +30,10 @@ import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ConnectionTuneBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ConnectionSecureBodyImpl; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -68,7 +72,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener throw new AMQException("No SASL context set up in session"); } - AuthenticationResult authResult = authMgr.authenticate(ss, body.response); + AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse()); switch (authResult.status) { case ERROR: @@ -76,44 +80,32 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName()); _logger.info("Authentication failed"); stateManager.changeState(AMQState.CONNECTION_CLOSING); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame close = ConnectionCloseBody.createAMQFrame(0, - (byte)8, (byte)0, // AMQP version (major, minor) - ConnectionCloseBody.getClazz((byte)8, (byte)0), // classId - ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - AMQConstant.NOT_ALLOWED.getName()); // replyText - session.writeFrame(close); + + ConnectionCloseBody closeBody = + new ConnectionCloseBodyImpl(AMQConstant.NOT_ALLOWED.getCode(), // replyCode + AMQConstant.NOT_ALLOWED.getName(), // replyText) + ConnectionSecureOkBodyImpl.CLASS_ID, // classId + ConnectionSecureOkBodyImpl.METHOD_ID); // methodId + + session.writeFrame(new AMQFrame(0,closeBody)); disposeSaslServer(session); break; case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - // TODO: Check the value of channelMax here: This should be the max - // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire), - // not Integer.MAX_VALUE (which is signed 4 bytes). - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, - (byte)8, (byte)0, // AMQP version (major, minor) - Integer.MAX_VALUE, // channelMax - ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax - HeartbeatConfig.getInstance().getDelay()); // heartbeat - session.writeFrame(tune); + + ConnectionTuneBody tuneBody = + new ConnectionTuneBodyImpl(ConnectionStartOkMethodHandler.getConfiguredMaxChannels(), + ConnectionStartOkMethodHandler.getConfiguredFrameSize(), + HeartbeatConfig.getInstance().getDelay()); + session.writeFrame(new AMQFrame(0,tuneBody)); disposeSaslServer(session); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, - (byte)8, (byte)0, // AMQP version (major, minor) - authResult.challenge); // challenge - session.writeFrame(challenge); + ConnectionSecureBody secureBody = + new ConnectionSecureBodyImpl(authResult.challenge); + session.writeFrame(new AMQFrame(0,secureBody)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 65b79cf8e7..b14c8a5c81 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -48,6 +48,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler(); private static final int DEFAULT_FRAME_SIZE = 65536; + private static final int DEFAULT_CHANNEL_MAX = 65536; public static StateAwareMethodListener<ConnectionStartOkBody> getInstance() { @@ -62,23 +63,23 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< { AMQProtocolSession session = stateManager.getProtocolSession(); final ConnectionStartOkBody body = evt.getMethod(); - _logger.info("SASL Mechanism selected: " + body.mechanism); - _logger.info("Locale selected: " + body.locale); + _logger.info("SASL Mechanism selected: " + body.getMechanism()); + _logger.info("Locale selected: " + body.getLocale()); AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager(); SaslServer ss = null; try { - ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN()); + ss = authMgr.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN()); session.setSaslServer(ss); - AuthenticationResult authResult = authMgr.authenticate(ss, body.response); + AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse()); //save clientProperties if (session.getClientProperties() == null) { - session.setClientProperties(body.clientProperties); + session.setClientProperties(body.getClientProperties()); } switch (authResult.status) @@ -141,6 +142,15 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< _logger.info("Framesize set to " + framesize); return framesize; } + + + static int getConfiguredMaxChannels() + { + final Configuration config = ApplicationRegistry.getInstance().getConfiguration(); + final int maxChannels = config.getInt("advanced.maxchannels", DEFAULT_CHANNEL_MAX); + _logger.info("Max Channels set to " + maxChannels); + return maxChannels; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java index ab7695955c..8a6b518934 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java @@ -49,6 +49,6 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C _logger.debug(body); } stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); - session.initHeartbeats(body.heartbeat); + session.initHeartbeats(body.getHeartbeat()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 2b123bcb2d..131d047c28 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -22,6 +22,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.amqp_8_0.ExchangeBoundOkBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -74,9 +75,9 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo ExchangeBoundBody body = evt.getMethod(); - AMQShortString exchangeName = body.exchange; - AMQShortString queueName = body.queue; - AMQShortString routingKey = body.routingKey; + AMQShortString exchangeName = body.getExchange(); + AMQShortString queueName = body.getQueue(); + AMQShortString routingKey = body.getRoutingKey(); if (exchangeName == null) { throw new AMQException("Exchange exchange must not be null"); @@ -86,8 +87,8 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (exchange == null) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) EXCHANGE_NOT_FOUND, // replyCode new AMQShortString("Exchange " + exchangeName + " not found")); // replyText } @@ -98,16 +99,16 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (exchange.hasBindings()) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) OK, // replyCode null); // replyText } else { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) NO_BINDINGS, // replyCode null); // replyText } @@ -119,8 +120,8 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (queue == null) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) QUEUE_NOT_FOUND, // replyCode new AMQShortString("Queue " + queueName + " not found")); // replyText } @@ -129,16 +130,16 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (exchange.isBound(queue)) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) OK, // replyCode null); // replyText } else { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) QUEUE_NOT_BOUND, // replyCode new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText } @@ -151,52 +152,58 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo if (queue == null) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) QUEUE_NOT_FOUND, // replyCode new AMQShortString("Queue " + queueName + " not found")); // replyText } else { - if (exchange.isBound(body.routingKey, queue)) + if (exchange.isBound(body.getRoutingKey(), queue)) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) OK, // replyCode null); // replyText } else { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode new AMQShortString("Queue " + queueName + " not bound with routing key " + - body.routingKey + " to exchange " + exchangeName)); // replyText + body.getRoutingKey() + " to exchange " + exchangeName)); // replyText } } } else { - if (exchange.isBound(body.routingKey)) + if (exchange.isBound(body.getRoutingKey())) { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) OK, // replyCode null); // replyText } else { // AMQP version change: Be aware of possible changes to parameter order as versions change. - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - major, minor, // AMQP version (major, minor) + response = createExchangeBoundResponseFrame(evt.getChannelId(), + // AMQP version (major, minor) NO_QUEUE_BOUND_WITH_RK, // replyCode - new AMQShortString("No queue bound with routing key " + body.routingKey + + new AMQShortString("No queue bound with routing key " + body.getRoutingKey() + " to exchange " + exchangeName)); // replyText } } session.writeFrame(response); } + + private AMQFrame createExchangeBoundResponseFrame(int channelId, int replyCode, AMQShortString replyText) + { + ExchangeBoundOkBody okBody = new ExchangeBoundOkBodyImpl(replyCode,replyText); + return new AMQFrame(channelId,okBody); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index be3ffcc698..00c279f133 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -64,43 +64,43 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange final ExchangeDeclareBody body = evt.getMethod(); if (_logger.isDebugEnabled()) { - _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange); + _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange()); } synchronized(exchangeRegistry) { - Exchange exchange = exchangeRegistry.getExchange(body.exchange); + Exchange exchange = exchangeRegistry.getExchange(body.getExchange()); if (exchange == null) { - if(body.passive && ((body.type == null) || body.type.length() ==0)) + if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0)) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange()); } else { try { - exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable, - body.passive, body.ticket); + exchange = exchangeFactory.createExchange(body.getExchange(), body.getType(), body.getDurable(), + body.getPassive(), body.getTicket()); exchangeRegistry.registerExchange(exchange); } catch(AMQUnknownExchangeType e) { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e); } } } - else if (!exchange.getType().equals(body.type)) + else if (!exchange.getType().equals(body.getType())) { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); } } - if(!body.nowait) + if(!body.getNowait()) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index f9926c399c..ebae8279d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -54,7 +54,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD ExchangeDeleteBody body = evt.getMethod(); try { - exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused); + exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 4dc67b1970..2451aa2fa7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.Exchange; @@ -61,8 +62,10 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); final QueueBindBody body = evt.getMethod(); + + AMQShortString routingKey = body.getRoutingKey(); final AMQQueue queue; - if (body.queue == null) + if (body.getQueue() == null) { AMQChannel channel = session.getChannel(evt.getChannelId()); @@ -77,33 +80,35 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); } - - if (body.routingKey == null) + + + + if (routingKey == null) { - body.routingKey = queue.getName(); + routingKey = queue.getName(); } } else { - queue = queueRegistry.getQueue(body.queue); + queue = queueRegistry.getQueue(body.getQueue()); } if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); } - final Exchange exch = exchangeRegistry.getExchange(body.exchange); + final Exchange exch = exchangeRegistry.getExchange(body.getExchange()); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); } try { - queue.bind(body.routingKey, body.arguments, exch); + queue.bind(routingKey, body.getArguments(), exch); } catch (AMQInvalidRoutingKeyException rke) { - throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString()); + throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString()); } catch (AMQException e) { @@ -112,9 +117,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if (_log.isInfoEnabled()) { - _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey); + _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); } - if (!body.nowait) + if (!body.getNowait()) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 8b2467f47d..5a49368c1c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -78,10 +78,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar QueueDeclareBody body = evt.getMethod(); + AMQShortString queueName = body.getQueue(); + // if we aren't given a queue name, we create one which we return to the client - if (body.queue == null) + if (body.getQueue() == null) { - body.queue = createName(); + queueName = createName(); } AMQQueue queue; @@ -90,11 +92,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar synchronized (queueRegistry) { - if (((queue = queueRegistry.getQueue(body.queue)) == null) ) + if (((queue = queueRegistry.getQueue(queueName)) == null) ) { - if(body.passive) + if(body.getPassive()) { - String msg = "Queue: " + body.queue + " not found."; + String msg = "Queue: " + queueName + " not found."; throw body.getChannelException(AMQConstant.NOT_FOUND,msg ); } else @@ -109,8 +111,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar { Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); - queue.bind(body.queue, null, defaultExchange); - _log.info("Queue " + body.queue + " bound to default exchange"); + queue.bind(queueName, null, defaultExchange); + _log.info("Queue " + body.getQueue() + " bound to default exchange"); } } } @@ -130,7 +132,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar channel.setDefaultQueue(queue); } - if (!body.nowait) + if (!body.getNowait()) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -139,8 +141,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar (byte)8, (byte)0, // AMQP version (major, minor) queue.getConsumerCount(), // consumerCount queue.getMessageCount(), // messageCount - body.queue); // queue - _log.info("Queue " + body.queue + " declared successfully"); + queueName); // queue + _log.info("Queue " + queueName + " declared successfully"); session.writeFrame(response); } } @@ -159,11 +161,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar throws AMQException { final QueueRegistry registry = virtualHost.getQueueRegistry(); - AMQShortString owner = body.exclusive ? session.getContextKey() : null; - final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost); + AMQShortString owner = body.getExclusive() ? session.getContextKey() : null; + final AMQQueue queue = new AMQQueue(body.getQueue(), body.getDurable(), owner, body.getAutoDelete(), virtualHost); final AMQShortString queueName = queue.getName(); - if(body.exclusive && !body.durable) + if(body.getExclusive() && !body.getDurable()) { final AMQProtocolSession.Task deleteQueueTask = new AMQProtocolSession.Task() diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 0c7de312a7..97675e8731 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -65,7 +65,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete QueueDeleteBody body = evt.getMethod(); AMQQueue queue; - if(body.queue == null) + if(body.getQueue() == null) { AMQChannel channel = session.getChannel(evt.getChannelId()); @@ -79,31 +79,31 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete } else { - queue = queueRegistry.getQueue(body.queue); + queue = queueRegistry.getQueue(body.getQueue()); } if(queue == null) { if(_failIfNotFound) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); } } else { - if(body.ifEmpty && !queue.isEmpty()) + if(body.getIfEmpty() && !queue.isEmpty()) { - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." ); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty." ); } - else if(body.ifUnused && !queue.isUnused()) + else if(body.getIfUnused() && !queue.isUnused()) { // TODO - Error code - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." ); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used." ); } else { - int purged = queue.delete(body.ifUnused, body.ifEmpty); + int purged = queue.delete(body.getIfUnused(), body.getIfEmpty()); store.removeQueue(queue.getName()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 0c00436470..e380bb3770 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -44,7 +44,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
- if(body.queue == null)
+ if(body.getQueue() == null)
{
if (channel == null)
@@ -65,14 +65,14 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod }
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
@@ -80,7 +80,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod long purged = queue.clearQueue(channel.getStoreContext());
- if(!body.nowait)
+ if(!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index a10f44f906..18b09c00a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.amqp_8_0.TxRollbackBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -56,10 +58,10 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod } channel.rollback(); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); + + TxRollbackBody rollbackBody = createTxRollbackBody(); + session.writeFrame(new AMQFrame(evt.getChannelId(), rollbackBody)); + //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). // Why, are we not allowed to send messages back to client before the ok method? @@ -70,4 +72,9 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } } + + private TxRollbackBody createTxRollbackBody() + { + return new TxRollbackBodyImpl(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index a9e478e301..bd8f29526d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.amqp_8_0.TxSelectBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; @@ -55,9 +57,12 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> channel.setLocalTransactional(); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); + TxSelectBody txSelect = createTxSelectBody(); + session.writeFrame(new AMQFrame(evt.getChannelId(), txSelect)); + } + + private TxSelectBody createTxSelectBody() + { + return new TxSelectBodyImpl(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java new file mode 100644 index 0000000000..e01c5aabbf --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java @@ -0,0 +1,57 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.AMQException;
+
+public interface ProtocolOutputConverter
+{
+ void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
+
+ interface Factory
+ {
+ ProtocolOutputConverter newInstance(AMQProtocolSession session);
+ }
+
+ void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException;
+
+ void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+
+ byte getProtocolMinorVersion();
+
+ byte getProtocolMajorVersion();
+
+ void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException;
+
+ void writeFrame(AMQDataBlock block);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java new file mode 100644 index 0000000000..8366c426dd --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -0,0 +1,62 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
+import org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public class ProtocolOutputConverterRegistry
+{
+
+ private static final Map<Byte, Map<Byte, Factory>> _registry =
+ new HashMap<Byte, Map<Byte, Factory>>();
+
+
+ static
+ {
+ register((byte) 8, (byte) 0, ProtocolOutputConverterImpl.getInstanceFactory());
+ }
+
+ private static void register(byte major, byte minor, Factory converter)
+ {
+ if(!_registry.containsKey(major))
+ {
+ _registry.put(major, new HashMap<Byte, Factory>());
+ }
+ _registry.get(major).put(minor, converter);
+ }
+
+
+ public static ProtocolOutputConverter getConverter(AMQProtocolSession session)
+ {
+ return _registry.get(session.getProtocolMajorVersion()).get(session.getProtocolMinorVersion()).newInstance(session);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java new file mode 100644 index 0000000000..bd5bb632fe --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -0,0 +1,288 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Iterator;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+ final long messageId = message.getMessageId();
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext,messageId, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+ final long messageId = message.getMessageId();
+
+ ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext, messageId, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ consumerTag,
+ deliveryTag, pb.getExchange(), messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ message.getMessagePublishInfo().getExchange(),
+ replyCode, replyText,
+ message.getMessagePublishInfo().getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+ ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+
+ writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ writeFrame(bodyFrameIterator.next());
+ }
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ writeFrame(BasicCancelOkBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ consumerTag // consumerTag
+ ));
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 2de32c2f0f..587cb3b2aa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -43,25 +43,15 @@ import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MainRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; -import org.apache.qpid.framing.VersionSpecificRegistry; -import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_8_0.ConnectionStartBodyImpl; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -71,7 +61,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class AMQMinaProtocolSession implements AMQProtocolSession, - ProtocolVersionList, Managable { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -111,12 +100,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private long _maxNoOfChannels = 1000; /* AMQP Version for this session */ - private byte _major = pv[pv.length - 1][PROTOCOL_MAJOR]; - private byte _minor = pv[pv.length - 1][PROTOCOL_MINOR]; + private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); + private FieldTable _clientProperties; private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); - private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length - 1][PROTOCOL_MAJOR], pv[pv.length - 1][PROTOCOL_MINOR]); + private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion); private List<Integer> _closingChannelsList = new ArrayList<Integer>(); + private ProtocolOutputConverter _protocolOutputConverter; public ManagedObject getManagedObject() @@ -195,86 +185,123 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _lastReceived = message; if (message instanceof ProtocolInitiation) { - ProtocolInitiation pi = (ProtocolInitiation) message; - // this ensures the codec never checks for a PI message again - ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); - try - { - pi.checkVersion(this); // Fails if not correct + protocolInitiationReceived((ProtocolInitiation) message); - // This sets the protocol version (and hence framing classes) for this session. - setProtocolVersion(pi.protocolMajor, pi.protocolMinor); + } + else if (message instanceof AMQFrame) + { + AMQFrame frame = (AMQFrame) message; + frameReceived(frame); - String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); + } + else + { + throw new UnknnownMessageTypeException(message); + } + } - String locales = "en_US"; + private void frameReceived(AMQFrame frame) + throws AMQException + { + int channelId = frame.getChannel(); + AMQBody body = frame.getBodyFrame(); - // Interfacing with generated code - be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, - _major, _minor, // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short) _major, // versionMajor - (short) _minor); // versionMinor - _minaProtocolSession.write(response); - } - catch (AMQException e) - { - _logger.error("Received incorrect protocol initiation", e); - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); - // TODO: Close connection (but how to wait until message is sent?) - // ritchiem 2006-12-04 will this not do? -// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); -// future.join(); -// close connection + if(_logger.isDebugEnabled()) + { + _logger.debug("Frame Received: " + frame); + } - } + if (body instanceof AMQMethodBodyImpl) + { + methodFrameReceived(channelId, (AMQMethodBodyImpl)body); + } + else if (body instanceof ContentHeaderBody) + { + contentHeaderReceived(channelId, (ContentHeaderBody)body); + } + else if (body instanceof ContentBody) + { + contentBodyReceived(channelId, (ContentBody)body); + } + else if (body instanceof HeartbeatBody) + { + // NO OP } else { - AMQFrame frame = (AMQFrame) message; - - if (frame.getBodyFrame() instanceof AMQMethodBody) - { - methodFrameReceived(frame); - } - else - { - contentFrameReceived(frame); - } + _logger.warn("Unrecognised frame " + frame.getClass().getName()); } } - private void methodFrameReceived(AMQFrame frame) + private void protocolInitiationReceived(ProtocolInitiation pi) { - if (_logger.isDebugEnabled()) + // this ensures the codec never checks for a PI message again + ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + try + { + pi.checkVersion(); // Fails if not correct + + // This sets the protocol version (and hence framing classes) for this session. + setProtocolVersion(pi._protocolMajor, pi._protocolMinor); + + String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); + + String locales = "en_US"; + + ConnectionStartBody connectionStartBody = createConnectionStartBody(pi, + locales.getBytes(), + mechanisms.getBytes(), + null); + + // Interfacing with generated code - be aware of possible changes to parameter order as versions change. + AMQFrame response = new AMQFrame((short) 0,connectionStartBody); // versionMinor + _minaProtocolSession.write(response); + } + catch (AMQException e) { - _logger.debug("Method frame received: " + frame); + _logger.error("Received incorrect protocol initiation", e); + + _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + + // TODO: Close connection (but how to wait until message is sent?) + // ritchiem 2006-12-04 will this not do? +// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); +// future.join(); +// close connection + } + } - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), - (AMQMethodBody) frame.getBodyFrame()); + private ConnectionStartBody createConnectionStartBody(ProtocolInitiation pi, + byte[] locales, + byte[] mechanisms, + FieldTable serverProperties) + { + return new ConnectionStartBodyImpl(pi._protocolMajor, pi._protocolMinor,serverProperties,mechanisms,locales); + } + + + private void methodFrameReceived(int channelId, AMQMethodBodyImpl methodBody) + { + + final AMQMethodEvent<AMQMethodBodyImpl> evt = new AMQMethodEvent<AMQMethodBodyImpl>(channelId, + methodBody); //Check that this channel is not closing - if (channelAwaitingClosure(frame.getChannel())) + if (channelAwaitingClosure(channelId)) { if ((evt.getMethod() instanceof ChannelCloseOkBody)) { if (_logger.isInfoEnabled()) { - _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok"); + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); } } else { if (_logger.isInfoEnabled()) { - _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring"); + _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); } return; } @@ -298,19 +325,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } if (!wasAnyoneInterested) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener on Broker."); + throw new AMQNoMethodHandlerException(evt); } } catch (AMQChannelException e) { - if (getChannel(frame.getChannel()) != null) + if (getChannel(channelId) != null) { if (_logger.isInfoEnabled()) { _logger.info("Closing channel due to: " + e.getMessage()); } - writeFrame(e.getCloseFrame(frame.getChannel())); - closeChannel(frame.getChannel()); + writeFrame(e.getCloseFrame(channelId)); + closeChannel(channelId); } else { @@ -328,7 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQConstant.CHANNEL_ERROR.getName().toString()); _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(ce.getCloseFrame(frame.getChannel())); + writeFrame(ce.getCloseFrame(channelId)); } } catch (AMQConnectionException e) @@ -339,7 +366,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } closeSession(); _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(frame.getChannel())); + writeFrame(e.getCloseFrame(channelId)); } } catch (Exception e) @@ -353,61 +380,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void contentFrameReceived(AMQFrame frame) throws AMQException - { - if (frame.getBodyFrame() instanceof ContentHeaderBody) - { - contentHeaderReceived(frame); - } - else if (frame.getBodyFrame() instanceof ContentBody) - { - contentBodyReceived(frame); - } - else if (frame.getBodyFrame() instanceof HeartbeatBody) - { - _logger.debug("Received heartbeat from client"); - } - else - { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); - } - } - private void contentHeaderReceived(AMQFrame frame) throws AMQException + private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header frame received: " + frame); - } - AMQChannel channel = getChannel(frame.getChannel()); + AMQChannel channel = getAndAssertChannel(channelId); + + channel.publishContentHeader(body); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); - } - else - { - channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); - } } - private void contentBodyReceived(AMQFrame frame) throws AMQException + private void contentBodyReceived(int channelId, ContentBody body) throws AMQException { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content body frame received: " + frame); - } - AMQChannel channel = getChannel(frame.getChannel()); + AMQChannel channel = getAndAssertChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); - } - else - { - channel.publishContentBody((ContentBody) frame.getBodyFrame(), this); - } + channel.publishContentBody(body, this); } /** @@ -437,6 +424,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return new ArrayList<AMQChannel>(_channelMap.values()); } + public AMQChannel getAndAssertChannel(int channelId) throws AMQException + { + AMQChannel channel = getChannel(channelId); + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + } + return channel; + } + public AMQChannel getChannel(int channelId) throws AMQException { if (channelAwaitingClosure(channelId)) @@ -685,24 +682,26 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private void setProtocolVersion(byte major, byte minor) { - _major = major; - _minor = minor; - _registry = MainRegistry.getVersionSpecificRegistry(major, minor); + _protocolVersion = new ProtocolVersion(major,minor); + + _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion); + + _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); } public byte getProtocolMajorVersion() { - return _major; + return _protocolVersion.getMajorVersion(); } public byte getProtocolMinorVersion() { - return _minor; + return _protocolVersion.getMinorVersion(); } public boolean isProtocolVersion(byte major, byte minor) { - return _major == major && _minor == minor; + return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor; } public VersionSpecificRegistry getRegistry() @@ -739,5 +738,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _taskList.remove(task); } + public ProtocolOutputConverter getProtocolOutputConverter() + { + return _protocolOutputConverter; + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java new file mode 100644 index 0000000000..82f6e96906 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java @@ -0,0 +1,34 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQMethodBodyImpl;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+
+public class AMQNoMethodHandlerException extends AMQException
+{
+
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBodyImpl> evt)
+ {
+ super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 9d397505dc..756a8b5ebe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -39,7 +39,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ConnectorConfiguration; @@ -53,7 +53,7 @@ import org.apache.qpid.ssl.SSLContextFactory; * the state for the connection. * */ -public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList +public class AMQPFastProtocolHandler extends IoHandlerAdapter { private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); @@ -162,12 +162,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); if (throwable instanceof AMQProtocolHeaderException) { - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be returned - here. */ - int i = pv.length - 1; - protocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + + protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + protocolSession.close(); + _logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable); } else if(throwable instanceof IOException) @@ -176,8 +175,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } else { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.write(ConnectionCloseBody.createAMQFrame(0, session.getProtocolMajorVersion(), diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 503dc8b554..4cfee06850 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -162,4 +163,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession void removeSessionCloseTask(Task task); + public ProtocolOutputConverter getProtocolOutputConverter(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index ea89136a62..50cbc35266 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -24,6 +24,7 @@ import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; import javax.management.Notification; +import javax.management.NotCompliantMBeanException; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -39,6 +40,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.AMQManagedObject; @@ -65,7 +67,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed new AMQShortString("Broker Management Console has closed the connection."); @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException + public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException { super(ManagedConnection.class, ManagedConnection.TYPE); _session = session; @@ -74,6 +76,8 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed _name = jmxEncode(new StringBuffer(remote), 0).toString(); init(); } + + static { try @@ -94,7 +98,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed { _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, - _channelAtttibuteNames, _channelAttributeTypes); + _channelAtttibuteNames, _channelAttributeTypes); _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); } @@ -215,18 +219,11 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed */ public void closeConnection() throws JMException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, - _session.getProtocolMajorVersion(), - _session.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + ConnectionCloseBody connectionCloseBody = + createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText ); - _session.writeFrame(response); + _session.writeFrame(new AMQFrame(0,connectionCloseBody)); try { @@ -238,6 +235,11 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed } } + private ConnectionCloseBody createConnectionCloseBody(int code, AMQShortString replyText) + { + return new ConnectionCloseBodyImpl(code,replyText,0,0); + } + @Override public MBeanNotificationInfo[] getNotificationInfo() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java new file mode 100644 index 0000000000..45d09e8f3e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java @@ -0,0 +1,33 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.AMQException;
+
+public class UnknnownMessageTypeException extends AMQException
+{
+ public UnknnownMessageTypeException(AMQDataBlock message)
+ {
+ super("Unknown message type: " + message.getClass().getName() + ": " + message);
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index aa7ea16afc..32873c3cf5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,23 +20,34 @@ */ package org.apache.qpid.server.queue; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBodyImpl; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.txn.TransactionalContext; /** Combines the information that make up a deliverable message into a more manageable form. */ + +import org.apache.log4j.Logger; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Combines the information that make up a deliverable message into a more manageable form. + */ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -114,7 +125,7 @@ public class AMQMessage try { - AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index)); + AMQBodyImpl cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index)); return new AMQFrame(_channel, cb); } catch (AMQException e) @@ -136,7 +147,7 @@ public class AMQMessage } } - private StoreContext getStoreContext() + public StoreContext getStoreContext() { return _txnContext.getStoreContext(); } @@ -579,6 +590,7 @@ public class AMQMessage } } +/* public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { @@ -746,6 +758,12 @@ public class AMQMessage protocolSession.writeFrame(bodyFrameIterator.next()); } } +*/ + + public AMQMessageHandle getMessageHandle() + { + return _messageHandle; + } public long getSize() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 4fd89f39da..c9329a244c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -41,9 +41,9 @@ import javax.management.openmbean.TabularType; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.CommonContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -344,12 +344,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que try { // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; + CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; String mimeType = null, encoding = null; if (headerProperties != null) { - mimeType = headerProperties.getContentType(); - encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); + AMQShortString mimeTypeShortSting = headerProperties.getContentType(); + mimeType = mimeTypeShortSting == null ? null : mimeTypeShortSting.toString(); + encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding().toString(); } Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); @@ -382,7 +383,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que AMQMessage msg = list.get(i - 1); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) headerBody.properties; String[] headerAttributes = headerProperties.toString().split(","); Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 208a59516c..e70926736d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -270,7 +270,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue); } - msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount()); + protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(), + deliveryTag, _queue.getMessageCount()); _totalMessageSize.addAndGet(-msg.getSize()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index ede7731a06..0a2e73880c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -29,9 +29,9 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -258,7 +258,7 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); } } @@ -294,7 +294,7 @@ public class SubscriptionImpl implements Subscription msg.decrementReference(storeContext); } - msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); } } @@ -466,13 +466,9 @@ public class SubscriptionImpl implements Subscription if (_autoClose && !_sentClose) { _logger.info("Closing autoclose subscription:" + this); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - (byte) 8, (byte) 0, // AMQP version (major, minor) - consumerTag // consumerTag - )); + ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); + _sentClose = true; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index d12f5cd084..73401ee664 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -27,35 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicGetBody; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.framing.ExchangeBoundBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueuePurgeBody; -import org.apache.qpid.framing.TxCommitBody; -import org.apache.qpid.framing.TxRollbackBody; -import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.handler.BasicAckMethodHandler; diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java index e3af0bc486..2f28a3125d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.state; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java deleted file mode 100644 index 90aa7bb998..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java +++ /dev/null @@ -1,57 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-
-import org.apache.mina.common.ByteBuffer;
-
-public class ContentChunkAdapter
-{
- public static ContentBody toConentBody(ContentChunk contentBodyChunk)
- {
- return new ContentBody(contentBodyChunk.getData());
- }
-
- public static ContentChunk toConentChunk(final ContentBody contentBodyChunk)
- {
- return new ContentChunk() {
-
- public int getSize()
- {
- return contentBodyChunk.getSize();
- }
-
- public ByteBuffer getData()
- {
- return contentBodyChunk.payload;
- }
-
- public void reduceToFit()
- {
- contentBodyChunk.reduceBufferToFit();
- }
- };
-
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java deleted file mode 100644 index 6ee2fa784d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.apache.qpid.server.store;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-
-public class MessagePublishInfoAdapter
-{
- private final byte _majorVersion;
- private final byte _minorVersion;
- private final int _classId;
- private final int _methodId;
-
-
- public MessagePublishInfoAdapter(byte majorVersion, byte minorVersion)
- {
- _majorVersion = majorVersion;
- _minorVersion = minorVersion;
- _classId = BasicPublishBody.getClazz(majorVersion,minorVersion);
- _methodId = BasicPublishBody.getMethod(majorVersion,minorVersion);
- }
-
- public BasicPublishBody toMethodBody(MessagePublishInfo pubInfo)
- {
- return new BasicPublishBody(_majorVersion,
- _minorVersion,
- _classId,
- _methodId,
- pubInfo.getExchange(),
- pubInfo.isImmediate(),
- pubInfo.isMandatory(),
- pubInfo.getRoutingKey(),
- 0) ; // ticket
- }
-
- public MessagePublishInfo toMessagePublishInfo(final BasicPublishBody body)
- {
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return body.getExchange();
- }
-
- public boolean isImmediate()
- {
- return body.getImmediate();
- }
-
- public boolean isMandatory()
- {
- return body.getMandatory();
- }
-
- public AMQShortString getRoutingKey()
- {
- return body.getRoutingKey();
- }
- };
- }
-}
|