diff options
author | Kim van der Riet <kpvdr@apache.org> | 2006-12-22 17:00:28 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2006-12-22 17:00:28 +0000 |
commit | 5129ac060aed57d8e31a62c3cd64ff0ad8995949 (patch) | |
tree | 12616b7ae0cc57d7c3fb88025fd05cf31686d51a | |
parent | 142d35580b326c99a306f6476ff0a0b723db920e (diff) | |
download | qpid-python-5129ac060aed57d8e31a62c3cd64ff0ad8995949.tar.gz |
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489691 13f79535-47bb-0310-9956-ffa450edef68
61 files changed, 643 insertions, 270 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 87691ccaa3..5e463646f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -81,7 +81,11 @@ public abstract class RequiredDeliveryException extends AMQException public CompositeAMQDataBlock getReturnMessage(int channel) { - BasicReturnBody returnBody = new BasicReturnBody(); + // AMQP version change: All generated *Body classes are now version-aware. + // Shortcut: hardwire version to 0-8 (major=8, minor=0) for now. + // TODO: Connect the version to that returned by the ProtocolInitiation + // for this session. + BasicReturnBody returnBody = new BasicReturnBody((byte)8, (byte)0); returnBody.exchange = _publishBody.exchange; returnBody.replyCode = getReplyCode(); returnBody.replyText = _message; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 673556cbec..198d2c1f3d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -54,7 +54,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC channel.unsubscribeConsumer(protocolSession, body.consumerTag); if(!body.nowait) { - final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + body.consumerTag); // consumerTag protocolSession.writeFrame(responseFrame); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 1e57c714ff..d3aece9818 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -81,7 +81,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic body.arguments, body.noLocal); if (!body.nowait) { - session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag)); // consumerTag } //now allow queue to start async processing of any backlog of messages @@ -90,16 +95,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic catch (AMQInvalidSelectorException ise) { _log.info("Closing connection due to invalid selector"); - session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(), - ise.getMessage(), BasicConsumeBody.CLASS_ID, - BasicConsumeBody.METHOD_ID)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.INVALID_SELECTOR.getCode(), // replyCode + ise.getMessage())); // replyText } catch (ConsumerTagNotUniqueException e) { String msg = "Non-unique consumer tag, '" + body.consumerTag + "'"; - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, - BasicConsumeBody.CLASS_ID, - BasicConsumeBody.METHOD_ID)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index efdbe7aae4..423ea5f276 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -64,7 +64,15 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi protocolSession.closeChannel(evt.getChannelId()); // TODO: modify code gen to make getClazz and getMethod public methods rather than protected // then we can remove the hardcoded 0,0 - AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + ChannelCloseBody.getClazz((byte)8, (byte)0), // classId + ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId + 500, // replyCode + "Unknown exchange name"); // replyText protocolSession.writeFrame(cf); } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 1357ff16b9..379ce3d072 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -44,6 +44,9 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException { session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); - session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0))); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 0efe12b137..d26f84d17e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -55,7 +55,10 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + " and method " + body.methodId); protocolSession.closeChannel(evt.getChannelId()); - AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index 87ccc60907..27833ac250 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -58,6 +58,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB channel.setSuspended(!body.active); _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active); - AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + body.active); // active protocolSession.writeFrame(response); - }} + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 4cccc774ba..43d2cae8e4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -55,7 +55,10 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(), exchangeRegistry); protocolSession.addChannel(channel); - AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 7bdb1942d0..d000e3b590 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -62,7 +62,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C { _logger.error("Error closing protocol session: " + e, e); } - final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index bfcc50e1f8..9f9b029ada 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -64,7 +64,12 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con contextKey = generateClientID(); } protocolSession.setContextKey(contextKey); - AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, + (byte)8, (byte)0, // AMQP version (major, minor) + contextKey); // knownHosts stateManager.changeState(AMQState.CONNECTION_OPEN); protocolSession.writeFrame(response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index c32f5e4283..ea93a357d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -75,25 +75,43 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName()); _logger.info("Authentication failed"); stateManager.changeState(AMQState.CONNECTION_CLOSING); - AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(), - AMQConstant.NOT_ALLOWED.getName(), - ConnectionCloseBody.CLASS_ID, - ConnectionCloseBody.METHOD_ID); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame close = ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + ConnectionCloseBody.getClazz((byte)8, (byte)0), // classId + ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + AMQConstant.NOT_ALLOWED.getName()); // replyText protocolSession.writeFrame(close); disposeSaslServer(protocolSession); break; case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, - ConnectionStartOkMethodHandler.getConfiguredFrameSize(), - HeartbeatConfig.getInstance().getDelay()); + // TODO: Check the value of channelMax here: This should be the max + // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire), + // not Integer.MAX_VALUE (which is signed 4 bytes). + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + Integer.MAX_VALUE, // channelMax + ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax + HeartbeatConfig.getInstance().getDelay()); // heartbeat protocolSession.writeFrame(tune); disposeSaslServer(protocolSession); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); - AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + authResult.challenge); // challenge protocolSession.writeFrame(challenge); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 79b2e11bca..9f24100df1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -92,13 +92,24 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< _logger.info("Connected as: " + ss.getAuthorizationID()); stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(), - HeartbeatConfig.getInstance().getDelay()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + Integer.MAX_VALUE, // channelMax + getConfiguredFrameSize(), // frameMax + HeartbeatConfig.getInstance().getDelay()); // heartbeat protocolSession.writeFrame(tune); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); - AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + authResult.challenge); // challenge protocolSession.writeFrame(challenge); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 5aaf78d6b7..30e8990b54 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -64,6 +64,11 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + byte major = (byte)8; + byte minor = (byte)0; + ExchangeBoundBody body = evt.getMethod(); String exchangeName = body.exchange; @@ -77,8 +82,11 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQFrame response; if (exchange == null) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND, - "Exchange " + exchangeName + " not found"); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + EXCHANGE_NOT_FOUND, // replyCode + "Exchange " + exchangeName + " not found"); // replyText } else if (routingKey == null) { @@ -86,11 +94,19 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { if (exchange.hasBindings()) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + NO_BINDINGS, // replyCode + null); // replyText } } else @@ -98,20 +114,29 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQQueue queue = queueRegistry.getQueue(queueName); if (queue == null) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, - "Queue " + queueName + " not found"); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + QUEUE_NOT_FOUND, // replyCode + "Queue " + queueName + " not found"); // replyText } else { if (exchange.isBound(queue)) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND, - "Queue " + queueName + " not bound to exchange " + - exchangeName); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + QUEUE_NOT_BOUND, // replyCode + "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText } } } @@ -121,24 +146,30 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQQueue queue = queueRegistry.getQueue(queueName); if (queue == null) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, - "Queue " + queueName + " not found"); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + QUEUE_NOT_FOUND, // replyCode + "Queue " + queueName + " not found"); // replyText } else { if (exchange.isBound(body.routingKey, queue)) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, - null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { + // AMQP version change: Be aware of possible changes to parameter order as versions change. response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, - "Queue " + queueName + - " not bound with routing key " + - body.routingKey + " to exchange " + - exchangeName); + major, minor, // AMQP version (major, minor) + SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode + "Queue " + queueName + " not bound with routing key " + + body.routingKey + " to exchange " + exchangeName); // replyText } } } @@ -146,16 +177,20 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { if (exchange.isBound(body.routingKey)) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, - null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { + // AMQP version change: Be aware of possible changes to parameter order as versions change. response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - NO_QUEUE_BOUND_WITH_RK, - "No queue bound with routing key " + - body.routingKey + " to exchange " + - exchangeName); + major, minor, // AMQP version (major, minor) + NO_QUEUE_BOUND_WITH_RK, // replyCode + "No queue bound with routing key " + body.routingKey + + " to exchange " + exchangeName); // replyText } } protocolSession.writeFrame(response); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index b7c75e290a..7937a9bb2d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -75,7 +75,10 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange } if(!body.nowait) { - AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 93ef902190..153a9de4c4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -53,7 +53,10 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD try { exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused); - AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } catch (ExchangeInUseException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index cf9e40a660..b7fc786981 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -90,7 +90,10 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } if (!body.nowait) { - final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index b7004de2a9..83f98de2d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -102,7 +102,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } if (!body.nowait) { - AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + 0L, // consumerCount + 0L, // messageCount + body.queue); // queue _log.info("Queue " + body.queue + " declared successfully"); protocolSession.writeFrame(response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 0dbc54f29b..688968b8a0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -81,7 +81,12 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete { int purged = queue.delete(body.ifUnused, body.ifEmpty); _store.removeQueue(queue.getName()); - session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + purged)); // messageCount } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index ac864cab6c..7fcad5bbf3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -52,7 +52,10 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> try{ AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); channel.commit(); - protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); channel.processReturns(protocolSession); }catch(AMQException e){ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 475f6ecacf..588dc026d4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -51,7 +51,10 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod try{ AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); channel.rollback(); - protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). channel.resend(protocolSession); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index c30bc7d66f..7df3825d8a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -48,6 +48,9 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> AMQMethodEvent<TxSelectBody> evt) throws AMQException { protocolSession.getChannel(evt.getChannelId()).setTransactional(true); - protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 7a9dfbc67c..9ff6b96690 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -165,8 +165,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _minor = pi.protocolMinor; String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); String locales = "en_US"; - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null, - mechanisms.getBytes(), locales.getBytes()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, + (byte)8, (byte)0, // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short)8, // versionMajor + (short)0 // versionMinor + ); _minaProtocolSession.write(response); } catch (AMQException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 18980f440b..2e9590277b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -168,11 +168,20 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } else if(throwable instanceof IOException) { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); + _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); } else { - protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.write(ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + 200, // replyCode + throwable.getMessage() // replyText + )); _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); protocolSession.close(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index d57f9b9be1..0ceadcb30b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -193,8 +193,16 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void closeConnection() throws JMException { - final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(), - "Broker Management Console has closing the connection.", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "Broker Management Console has closing the connection." // replyText + ); _session.writeFrame(response); try diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b27cd807c0..afe4ea95b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -157,10 +157,20 @@ public class AMQMessage public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag) { + AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; - allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, deliveryTag, _redelivered, - getExchangeName(), getRoutingKey()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + allFrames[0] = BasicDeliverBody.createAMQFrame(channel, + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + getExchangeName(), // exchange + _redelivered, // redelivered + getRoutingKey() // routingKey + ); allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); for (int i = 2; i < allFrames.length; i++) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 4272541298..78310e8eb3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -379,7 +379,13 @@ public class SubscriptionImpl implements Subscription if (!_closed) { _logger.info("Closing autoclose subscription:" + this); - protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag // consumerTag + )); _closed = true; } } @@ -392,9 +398,17 @@ public class SubscriptionImpl implements Subscription private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, - deliveryTag, false, exchange, - routingKey); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + exchange, // exchange + false, // redelivered + routingKey // routingKey + ); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? deliverFrame.writePayload(buf); buf.flip(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 820b8c3f83..58ac49dd4e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -465,12 +465,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class); + ChannelOpenBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null), // outOfBand + ChannelOpenOkBody.class); //todo send low water mark when protocol allows. + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false), + BasicQosBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + false, // global + prefetchHigh, // prefetchCount + 0), // prefetchSize BasicQosOkBody.class); if (transacted) @@ -479,7 +492,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.debug("Issuing TxSelect for " + channelId); } - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2136d565f1..8f85aedb1f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -477,7 +477,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Commits outstanding messages sent and outstanding acknowledgements. - _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class); } catch (AMQException e) { @@ -492,8 +495,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkTransacted(); try { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class); + TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class); } catch (AMQException e) { @@ -516,8 +522,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { _connection.getProtocolHandler().closeSession(this); - final AMQFrame frame = ChannelCloseBody.createAMQFrame( - getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "JMS client closing channel"); // replyText _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -707,7 +720,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.clearUnackedMessages(); } - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -1039,7 +1057,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void declareExchangeSynch(String name, String type) throws AMQException { - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + false, // nowait + false, // passive + 0, // ticket + type); // type _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); } @@ -1050,7 +1081,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + true, // nowait + false, // passive + 0, // ticket + type); // type protocolHandler.writeFrame(exchangeDeclare); } @@ -1072,9 +1116,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.setQueueName(protocolHandler.generateQueueName()); } - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(), - false, amqd.isDurable(), amqd.isExclusive(), - amqd.isAutoDelete(), true, null); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + true, // nowait + false, // passive + amqd.getQueueName(), // queue + 0); // ticket protocolHandler.writeFrame(queueDeclare); return amqd.getQueueName(); @@ -1082,9 +1136,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException { - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0, - queueName, amqd.getExchangeName(), - amqd.getRoutingKey(), true, ft); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + ft, // arguments + amqd.getExchangeName(), // exchange + true, // nowait + queueName, // queue + amqd.getRoutingKey(), // routingKey + 0); // ticket protocolHandler.writeFrame(queueBind); } @@ -1122,10 +1184,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, - queueName, tag, consumer.isNoLocal(), - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, - consumer.isExclusive(), nowait, arguments); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + 0); // ticket if (nowait) { protocolHandler.writeFrame(jmsConsume); @@ -1302,8 +1373,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { try { - AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false, - false, true); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + 0); // ticket _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } catch (AMQException e) @@ -1389,8 +1468,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isQueueBound(String queueName, String routingKey) throws JMSException { - AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME, - routingKey, queueName); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange + queueName, // queue + routingKey); // routingKey AMQMethodEvent response = null; try { @@ -1447,7 +1532,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + deliveryTag, // deliveryTag + multiple); // multiple if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); @@ -1606,14 +1697,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void suspendChannel() { _logger.warn("Suspending channel"); - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + false); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } private void unsuspendChannel() { _logger.warn("Unsuspending channel"); - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + true); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index cefaca8d52..1033e827de 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -448,7 +448,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if(sendClose) { - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + _consumerTag, // consumerTag + false); // nowait try { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 7a5fcbccf9..d38e461400 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -134,9 +134,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false - AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), - destination.getExchangeClass(), false, - false, false, false, true, null); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + destination.getExchangeName(), // exchange + false, // internal + true, // nowait + false, // passive + 0, // ticket + destination.getExchangeClass()); // type _protocolHandler.writeFrame(declare); } @@ -512,8 +523,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j AbstractJMSMessage message = convertToNativeMessage(origMessage); message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); - AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), - destination.getRoutingKey(), mandatory, immediate); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + destination.getExchangeName(), // exchange + immediate, // immediate + mandatory, // mandatory + destination.getRoutingKey(), // routingKey + 0); // ticket long currentTime = 0; if (!_disableTimestamps) @@ -555,7 +574,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } // weight argument of zero indicates no child content headers, just bodies - AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0, + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0, contentHeaderProperties, size); if (_logger.isDebugEnabled()) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index fd2968cdfd..278f0906ea 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -57,7 +57,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); } - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); evt.getProtocolSession().writeFrame(frame); if (errorCode != AMQConstant.REPLY_SUCCESS.getCode()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index dd9fd651c1..bbfb100b25 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -59,7 +59,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener String reason = method.replyText; // TODO: check whether channel id of zero is appropriate - evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)8, (byte)0)); if (errorCode != 200) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java index fd15faf429..153b641a39 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java @@ -54,7 +54,12 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener { // Evaluate server challenge byte[] response = client.evaluateChallenge(body.challenge); - AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + response); // response evt.getProtocolSession().writeFrame(responseFrame); } catch (SaslException e) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index cfc3c2898b..8640bbb999 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -126,8 +126,15 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName()); clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion()); clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo()); - ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism, - saslResponse, selectedLocale)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + clientProperties, // clientProperties + selectedLocale, // locale + mechanism, // mechanism + saslResponse)); // response } catch (UnsupportedEncodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index 8fee277392..3592ee4c53 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -72,11 +72,25 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) { - return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + return ConnectionOpenBody.createAMQFrame(channel, + (byte)8, (byte)0, // AMQP version (major, minor) + capabilities, // capabilities + insist, // insist + path); // virtualHost } protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params) { - return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + return ConnectionTuneOkBody.createAMQFrame(channel, + (byte)8, (byte)0, // AMQP version (major, minor) + params.getChannelMax(), // channelMax + params.getFrameMax(), // frameMax + params.getHeartbeat()); // heartbeat } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index eab9084717..f37af835e1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -472,8 +472,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _stateManager.changeState(AMQState.CONNECTION_CLOSING); - final AMQFrame frame = ConnectionCloseBody.createAMQFrame( - 0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "JMS client is closing the connection."); // replyText syncWrite(frame, ConnectionCloseOkBody.class); _protocolSession.closeProtocolSession(); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index 07d572d27f..5209df59cd 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -112,7 +112,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, private void ping(Broker b) throws AMQException { - ClusterPingBody ping = new ClusterPingBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0); ping.broker = _group.getLocal().getDetails(); ping.responseRequired = true; ping.load = _loadTable.getLocalLoad(); @@ -158,7 +160,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, Broker leader = connectToLeader(member); _logger.info(new LogMessage("Connected to {0}. joining", leader)); - ClusterJoinBody join = new ClusterJoinBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); join.broker = _group.getLocal().getDetails(); send(leader, new SimpleSendable(join)); } @@ -177,7 +181,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, public void leave() throws AMQException { - ClusterLeaveBody leave = new ClusterLeaveBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); leave.broker = _group.getLocal().getDetails(); send(getLeader(), new SimpleSendable(leave)); } @@ -198,7 +204,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, } else { - ClusterSuspectBody suspect = new ClusterSuspectBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); suspect.broker = broker.getDetails(); send(getLeader(), new SimpleSendable(suspect)); } @@ -220,7 +228,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, else { //pass request on to leader: - ClusterJoinBody request = new ClusterJoinBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); request.broker = member.getDetails(); Broker leader = getLeader(); send(leader, new SimpleSendable(request)); @@ -265,7 +275,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, private ClusterMembershipBody createAnnouncement(String membership) { - ClusterMembershipBody announce = new ClusterMembershipBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0); //TODO: revise this way of converting String to bytes... announce.members = membership.getBytes(); return announce; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index 0836e9d5fa..93515e42b6 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -48,7 +48,13 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); - session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + evt.getMethod().queue // consumerTag + )); } else { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 3bd9f5d387..832e4830ab 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -51,7 +51,9 @@ class ConsumerCounts { for(String queue : _counts.keySet()) { - BasicConsumeBody m = new BasicConsumeBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0); m.queue = queue; m.consumerTag = queue; replay(m, messages); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index 4a00b5cbc3..ce3e71f0a5 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -44,15 +44,19 @@ import java.util.Arrays; public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + private final byte major = (byte)8; + private final byte minor = (byte)0; private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] { - new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody()), - new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody()), - new FrameDescriptor(QueueBindBody.class, new QueueBindBody()), - new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody()), - new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody()), - new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody()), - new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody()) + new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)), + new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor)), + new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)), + new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)), + new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)), + new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)), + new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor)) }); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index fa737cd1b6..338817e892 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -124,7 +124,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener } } _consumers.replay(methods); - methods.add(new ClusterSynchBody()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + methods.add(new ClusterSynchBody((byte)8, (byte)0)); return methods; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index ee16f6062f..8765aebf77 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -75,7 +75,9 @@ public class ClusteredQueue extends AMQQueue delete(); //send deletion request to all other members: - QueueDeleteBody request = new QueueDeleteBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); request.queue = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } @@ -87,7 +89,9 @@ public class ClusteredQueue extends AMQQueue super.unregisterProtocolSession(ps, channel, consumerTag); //signal other members: - BasicCancelBody request = new BasicCancelBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0); request.consumerTag = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index a3af0fedc7..94f17cb9d3 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -56,7 +56,9 @@ public class PrivateQueue extends AMQQueue super.autodelete(); //send delete request to peers: - QueueDeleteBody request = new QueueDeleteBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); request.queue = getName(); _groupMgr.broadcast(new SimpleSendable(request)); } diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java index f7fe5dc35a..ed18710c64 100644 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java @@ -148,6 +148,9 @@ public class BrokerTest extends TestCase TestMethod(Object id) { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + super((byte)8, (byte)0); this.id = id; } diff --git a/java/common/pom.xml b/java/common/pom.xml index 653b2a8a9d..053fb5fafb 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -35,14 +35,12 @@ <properties> <topDirectoryLocation>..</topDirectoryLocation> - <cluster.asl>${basedir}/src/main/xsl/cluster.asl</cluster.asl> - <spec.stylesheet>${basedir}/src/main/xsl/framing.xsl</spec.stylesheet> - <registry.stylesheet>${basedir}/src/main/xsl/registry.xsl</registry.stylesheet> - <registry.template>${basedir}/src/main/xsl/registry.template</registry.template> + <gentools.home>${topDirectoryLocation}/../gentools</gentools.home> <generated.path>${project.build.directory}/generated-sources/xsl</generated.path> <generated.package>org/apache/qpid/framing</generated.package> <generated.dir>${generated.path}/${generated.package}</generated.dir> <specs.dir>${topDirectoryLocation}/../specs</specs.dir> + <cluster.asl>${basedir}/src/main/xsl/cluster.asl</cluster.asl> </properties> <build> @@ -57,13 +55,10 @@ <configuration> <tasks> <ant antfile="protocol-version.xml"> - <property name="cluster.asl" value="${cluster.asl}"/> - <property name="spec.stylesheet" value="${spec.stylesheet}"/> - <property name="registry.stylesheet" value="${registry.stylesheet}"/> - <property name="registry.template" value="${registry.template}"/> + <property name="gentools.home" value="${gentools.home}"/> <property name="generated.dir" value="${generated.dir}"/> - <property name="proto_version" value="${generated.dir}/ProtocolVersionList.java"/> - <property name="specs.dir" value="${specs.dir}"/> + <property name="cluster.asl" value="${cluster.asl}"/> + <property name="xml.spec.list" value="${specs.dir}/amqp-8.0.xml ${cluster.asl}"/> </ant> </tasks> <sourceRoot>${generated.path}</sourceRoot> diff --git a/java/common/protocol-version.xml b/java/common/protocol-version.xml index 96ce348523..6a92dfbe2b 100644 --- a/java/common/protocol-version.xml +++ b/java/common/protocol-version.xml @@ -20,102 +20,21 @@ --> <project name="Qpid Common Protocol Versions" default="generate"> - <property name="saxon.jar" value="lib/saxon/saxon8.jar"/> - <!-- temporarily hard-wired XML spec version for build avoidance --> - <property name="amqp.xml" value="${specs.dir}/amqp-8.0.xml"/> - - <macrodef name="saxon"> - <attribute name="out"/> - <attribute name="src"/> - <attribute name="xsl"/> - <element name="args" implicit="true" optional="true"/> - <sequential> - <java jar="${saxon.jar}" fork="true"> - <arg value="-o"/> - <arg value="@{out}"/> - <arg value="@{src}"/> - <arg value="@{xsl}"/> - <args/> - </java> - </sequential> - </macrodef> - - <macrodef name="amqp"> - <attribute name="ver"/> - <sequential> - <!-- Check for the existence of the AMQP specification file --> - <property name="amqpspecfile-@{ver}" value="${specs.dir}/amqp-@{ver}.xml"/> - <available file="${specs.dir}/amqp-@{ver}.xml" - property="amqpspecfile.present-@{ver}"/> - <fail unless="amqpspecfile.present-@{ver}" - message="ERROR: AMQP specification file ${specs.dir}/amqp-@{ver}.xml not found."/> - - <!-- Read in the file as a set of properties; extract the amqp version --> - <xmlproperty prefix="@{ver}" file="${specs.dir}/amqp-@{ver}.xml"/> - <echo>Found AMQP specification file "${specs.dir}/amqp-@{ver}.xml"; major=${@{ver}.amqp(major)} minor=${@{ver}.amqp(minor)}</echo> - - <!-- Add the version to the ProtocolVersionList.java file --> - <replaceregexp file="${proto_version}" match=" // !VER!" - replace=",${line.separator} {${@{ver}.amqp(major)}, ${@{ver}.amqp(minor)}} // !VER!" - flags="s" byline="true"/> - <replaceregexp file="${proto_version}" match=" // !VER1!" - replace="{${@{ver}.amqp(major)}, ${@{ver}.amqp(minor)}} // !VER!" - flags="s" byline="true"/> - - <!-- Create directory; generate from specification file --> - <saxon out="${generated.dir}/results.out" - src="${specs.dir}/amqp-@{ver}.xml" - xsl="${spec.stylesheet}"> - <arg value="major=${@{ver}.amqp(major)}"/> - <arg value="minor=${@{ver}.amqp(minor)}"/> - <arg value="registry_name=MainRegistry"/> - </saxon> - <!-- --> - <saxon out="${generated.dir}/cluster.out" - src="${cluster.asl}" - xsl="${spec.stylesheet}"> - <arg value="major=${@{ver}.amqp(major)}"/> - <arg value="minor=${@{ver}.amqp(minor)}"/> - <arg value="registry_name=ClusterRegistry"/> - </saxon> - <saxon out="${generated.dir}/registry.out" - src="${registry.template}" - xsl="${registry.stylesheet}"> - <arg value="major=${@{ver}.amqp(major)}"/> - <arg value="minor=${@{ver}.amqp(minor)}"/> - </saxon> - </sequential> - </macrodef> - - <uptodate property="generated" targetfile="${generated.dir}/results.out" - srcfile="${amqp.xml}"/> - - <target name="generate" unless="generated"> +<!-- + <property name="specs.dir" value="../../specs"/> + <property name="gentools.home" value="../../gentools"/> + <property name="generated.dir" value="target/generated-sources/xsl/org/apache/qpid/framing"/> + <property name="cluster.asl" value="src/main/xsl/cluster.asl"/> + <property name="xml.spec.list" value="${specs.dir}/amqp-8.0.xml ${cluster.asl}"/> +--> + + <target name="generate"> <mkdir dir="${generated.dir}"/> - <copy file="src/main/versions/ProtocolVersionList.java.tmpl" tofile="${proto_version}" - overwrite="true"/> - <!-- - NOTE: Set the AMQP version numbers to be supported in this build here. - The last version in this list will be the version returned when a protocol - ProtocolInitiation NAK frame is returned by the broker. Usually this is the - highest or most recent version. - --> - <!-- <amqp ver="0.8"/> - <amqp ver="0.9"/> - <amqp ver="0.10"/> --> - <amqp ver="8.0"/> - -<!-- <saxon out="${generated.dir}/results.out" src="${amqp.xml}" - xsl="${stylesheet}"> - <arg value="asl_base=${asl.base}"/> - <arg value="registry_name=MainRegistry"/> - </saxon> - <saxon out="${generated.dir}/cluster.out" src="${cluster.asl}" - xsl="${stylesheet}"> - <arg value="registry_name=ClusterRegistry"/> - </saxon> - <saxon out="${generated.dir}/registry.out" src="${registry_template}" - xsl="${registry_stylesheet}"/> --> + <exec dir="${gentools.home}/src" executable="pwd" /> + <echo>XML files to be processed: ${xml.spec.list}</echo> + <java classname="org.apache.qpid.gentools.Main" fork="true" dir="${gentools.home}/src"> + <arg line="-j -o ${generated.dir} -t ${gentools.home}/templ.java ${xml.spec.list}"/> + </java> </target> <target name="precompile" depends="generate"/> diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 4d604f8c0b..2ead0a03e6 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -27,23 +27,30 @@ public class AMQChannelException extends AMQException { private final int _classId; private final int _methodId; + /* AMQP version for which exception ocurred */ + private final byte major; + private final byte minor; - public AMQChannelException(int errorCode, String msg, int classId, int methodId, Throwable t) + public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) { super(errorCode, msg, t); _classId = classId; _methodId = methodId; + this.major = major; + this.minor = minor; } - public AMQChannelException(int errorCode, String msg, int classId, int methodId) + public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor) { super(errorCode, msg); _classId = classId; _methodId = methodId; + this.major = major; + this.minor = minor; } public AMQFrame getCloseFrame(int channel) { - return ChannelCloseBody.createAMQFrame(channel, getErrorCode(), getMessage(), _classId, _methodId); + return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), getMessage()); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index d829144b11..36287d2923 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -34,5 +34,6 @@ public abstract class AMQBody protected abstract void writePayload(ByteBuffer buffer); - protected abstract void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; + protected abstract void populateFromBuffer(ByteBuffer buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 438bfa8d82..2a999fe130 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -81,7 +81,7 @@ public class AMQDataBlockDecoder } protected Object createAndPopulateFrame(ByteBuffer in) - throws AMQFrameDecodingException + throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); if (!isSupportedFrameType(type)) diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index e75f37d623..6af691fbe8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -62,7 +62,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock * @throws AMQFrameDecodingException */ public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory) - throws AMQFrameDecodingException + throws AMQFrameDecodingException, AMQProtocolVersionException { this.channel = channel; bodyFrame = bodyFactory.createBody(buffer); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 6659b4ff8f..5ccc900b2c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -26,6 +26,20 @@ import org.apache.qpid.AMQChannelException; public abstract class AMQMethodBody extends AMQBody { public static final byte TYPE = 1; + + /** + * AMQP version + */ + protected byte major; + protected byte minor; + public byte getMajor() { return major; } + public byte getMinor() { return minor; } + + public AMQMethodBody(byte major, byte minor) + { + this.major = major; + this.minor = minor; + } /** unsigned short */ protected abstract int getBodySize(); @@ -80,11 +94,11 @@ public abstract class AMQMethodBody extends AMQBody */ public AMQChannelException getChannelException(int code, String message) { - return new AMQChannelException(code, message, getClazz(), getMethod()); + return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor); } public AMQChannelException getChannelException(int code, String message, Throwable cause) { - return new AMQChannelException(code, message, getClazz(), getMethod(), cause); + return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 107af67dc7..da0909d32f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -41,6 +41,11 @@ public class AMQMethodBodyFactory implements BodyFactory public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException { - return MethodBodyDecoderRegistry.get(in.getUnsignedShort(), in.getUnsignedShort()); + // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML + // segments generated together are now handled by MainRegistry. The Cluster class, + // if generated together with amqp.xml is a part of MainRegistry. + // TODO: Connect with version acquired from ProtocolInitiation class. + return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), + (byte)8, (byte)0); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 61837f65cc..fc80d93f82 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -245,7 +245,7 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties } public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException + throws AMQFrameDecodingException, AMQProtocolVersionException { _propertyFlags = propertyFlags; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index a59869b1d8..4ee36ee831 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -58,7 +58,8 @@ public class ContentHeaderBody extends AMQBody return TYPE; } - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + protected void populateFromBuffer(ByteBuffer buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException { classId = buffer.getUnsignedShort(); weight = buffer.getUnsignedShort(); @@ -75,7 +76,8 @@ public class ContentHeaderBody extends AMQBody * @return * @throws AMQFrameDecodingException */ - public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException { ContentHeaderBody body = new ContentHeaderBody(); body.populateFromBuffer(buffer, size); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java index 561d7852fd..88bdefca88 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -41,7 +41,7 @@ public interface ContentHeaderProperties * @throws AMQFrameDecodingException when the buffer does not contain valid data */ void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException; + throws AMQFrameDecodingException, AMQProtocolVersionException; /** * @return the size of the encoded property list in bytes. diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index cec413cb9d..cfcc5db857 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -37,16 +37,19 @@ public class ContentHeaderPropertiesFactory public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, ByteBuffer buffer, int size) - throws AMQFrameDecodingException + throws AMQFrameDecodingException, AMQProtocolVersionException { ContentHeaderProperties properties; - switch (classId) + // AMQP version change: "Hardwired" version to major=8, minor=0 + // TODO: Change so that the actual version is obtained from + // the ProtocolInitiation object for this session. + if (classId == BasicConsumeBody.getClazz((byte)8, (byte)0)) { - case BasicConsumeBody.CLASS_ID: - properties = new BasicContentHeaderProperties(); - break; - default: - throw new AMQFrameDecodingException("Unsupport content header class id: " + classId); + properties = new BasicContentHeaderProperties(); + } + else + { + throw new AMQFrameDecodingException("Unsupport content header class id: " + classId); } properties.populatePropertiesFromBuffer(buffer, propertyFlags, size); return properties; diff --git a/java/common/src/main/xsl/cluster.asl b/java/common/src/main/xsl/cluster.asl index 40ca937904..09e8ca0787 100644 --- a/java/common/src/main/xsl/cluster.asl +++ b/java/common/src/main/xsl/cluster.asl @@ -29,26 +29,26 @@ provide a clustered service to clients. </doc> -<method name = "join"> +<method name = "join" index="10"> <field name = "broker" type = "shortstr" /> </method> -<method name = "membership"> +<method name = "membership" index="20"> <field name = "members" type = "longstr" /> </method> -<method name = "synch"> +<method name = "synch" index="30"> </method> -<method name = "leave"> +<method name = "leave" index="40"> <field name = "broker" type = "shortstr" /> </method> -<method name = "suspect"> +<method name = "suspect" index="50"> <field name = "broker" type = "shortstr" /> </method> -<method name = "ping"> +<method name = "ping" index="60"> <field name = "broker" type = "shortstr" /> <field name = "load" type = "long" /> <field name = "response required" type = "bit" /> diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index bcf1602433..23f7f3d53b 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -140,7 +140,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase static BasicPublishBody getPublishRequest(String id) { - BasicPublishBody request = new BasicPublishBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Establish some way to determine the version for the test. + BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0); request.routingKey = id; return request; } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 8de3c8bf33..fafb87abd5 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -148,7 +148,9 @@ public class AMQQueueMBeanTest extends TestCase private AMQMessage message(boolean immediate) throws AMQException { - BasicPublishBody publish = new BasicPublishBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Establish some way to determine the version for the test. + BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); publish.immediate = immediate; ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = 1000; // in bytes diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java index c2511f0a99..11bae0d9f6 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -80,7 +80,9 @@ public class AckTest extends TestCase { for (int i = 1; i <= count; i++) { - BasicPublishBody publishBody = new BasicPublishBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Establish some way to determine the version for the test. + BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0); publishBody.routingKey = "rk"; publishBody.exchange = "someExchange"; AMQMessage msg = new AMQMessage(_messageStore, publishBody); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java index 8570e6521f..6b764acd54 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -46,7 +46,9 @@ class MessageTestHelper extends TestCase AMQMessage message(boolean immediate) throws AMQException { - BasicPublishBody publish = new BasicPublishBody(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Establish some way to determine the version for the test. + BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); publish.immediate = immediate; return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), null); } |