summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2006-12-22 17:00:28 +0000
committerKim van der Riet <kpvdr@apache.org>2006-12-22 17:00:28 +0000
commit5129ac060aed57d8e31a62c3cd64ff0ad8995949 (patch)
tree12616b7ae0cc57d7c3fb88025fd05cf31686d51a
parent142d35580b326c99a306f6476ff0a0b723db920e (diff)
downloadqpid-python-5129ac060aed57d8e31a62c3cd64ff0ad8995949.tar.gz
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489691 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java85
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java149
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java11
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java24
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java18
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java4
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java3
-rw-r--r--java/common/pom.xml15
-rw-r--r--java/common/protocol-version.xml109
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBody.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java18
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java17
-rw-r--r--java/common/src/main/xsl/cluster.asl12
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java4
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java4
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java4
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java4
61 files changed, 643 insertions, 270 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 87691ccaa3..5e463646f9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -81,7 +81,11 @@ public abstract class RequiredDeliveryException extends AMQException
public CompositeAMQDataBlock getReturnMessage(int channel)
{
- BasicReturnBody returnBody = new BasicReturnBody();
+ // AMQP version change: All generated *Body classes are now version-aware.
+ // Shortcut: hardwire version to 0-8 (major=8, minor=0) for now.
+ // TODO: Connect the version to that returned by the ProtocolInitiation
+ // for this session.
+ BasicReturnBody returnBody = new BasicReturnBody((byte)8, (byte)0);
returnBody.exchange = _publishBody.exchange;
returnBody.replyCode = getReplyCode();
returnBody.replyText = _message;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 673556cbec..198d2c1f3d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -54,7 +54,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
if(!body.nowait)
{
- final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ body.consumerTag); // consumerTag
protocolSession.writeFrame(responseFrame);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 1e57c714ff..d3aece9818 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -81,7 +81,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
body.arguments, body.noLocal);
if (!body.nowait)
{
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag)); // consumerTag
}
//now allow queue to start async processing of any backlog of messages
@@ -90,16 +95,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
- ise.getMessage(), BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.INVALID_SELECTOR.getCode(), // replyCode
+ ise.getMessage())); // replyText
}
catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
- BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index efdbe7aae4..423ea5f276 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -64,7 +64,15 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
protocolSession.closeChannel(evt.getChannelId());
// TODO: modify code gen to make getClazz and getMethod public methods rather than protected
// then we can remove the hardcoded 0,0
- AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ChannelCloseBody.getClazz((byte)8, (byte)0), // classId
+ ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId
+ 500, // replyCode
+ "Unknown exchange name"); // replyText
protocolSession.writeFrame(cf);
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index 1357ff16b9..379ce3d072 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -44,6 +44,9 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
- session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0)));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 0efe12b137..d26f84d17e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -55,7 +55,10 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
protocolSession.closeChannel(evt.getChannelId());
- AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index 87ccc60907..27833ac250 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -58,6 +58,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
- AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ body.active); // active
protocolSession.writeFrame(response);
- }}
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 4cccc774ba..43d2cae8e4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -55,7 +55,10 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
exchangeRegistry);
protocolSession.addChannel(channel);
- AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 7bdb1942d0..d000e3b590 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -62,7 +62,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
{
_logger.error("Error closing protocol session: " + e, e);
}
- final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index bfcc50e1f8..9f9b029ada 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -64,7 +64,12 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
contextKey = generateClientID();
}
protocolSession.setContextKey(contextKey);
- AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ contextKey); // knownHosts
stateManager.changeState(AMQState.CONNECTION_OPEN);
protocolSession.writeFrame(response);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index c32f5e4283..ea93a357d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -75,25 +75,43 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
stateManager.changeState(AMQState.CONNECTION_CLOSING);
- AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(),
- AMQConstant.NOT_ALLOWED.getName(),
- ConnectionCloseBody.CLASS_ID,
- ConnectionCloseBody.METHOD_ID);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ConnectionCloseBody.getClazz((byte)8, (byte)0), // classId
+ ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName()); // replyText
protocolSession.writeFrame(close);
disposeSaslServer(protocolSession);
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE,
- ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
- HeartbeatConfig.getInstance().getDelay());
+ // TODO: Check the value of channelMax here: This should be the max
+ // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
+ // not Integer.MAX_VALUE (which is signed 4 bytes).
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
protocolSession.writeFrame(tune);
disposeSaslServer(protocolSession);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ authResult.challenge); // challenge
protocolSession.writeFrame(challenge);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 79b2e11bca..9f24100df1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -92,13 +92,24 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
- HeartbeatConfig.getInstance().getDelay());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
protocolSession.writeFrame(tune);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ authResult.challenge); // challenge
protocolSession.writeFrame(challenge);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 5aaf78d6b7..30e8990b54 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -64,6 +64,11 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ byte major = (byte)8;
+ byte minor = (byte)0;
+
ExchangeBoundBody body = evt.getMethod();
String exchangeName = body.exchange;
@@ -77,8 +82,11 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
AMQFrame response;
if (exchange == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
- "Exchange " + exchangeName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ EXCHANGE_NOT_FOUND, // replyCode
+ "Exchange " + exchangeName + " not found"); // replyText
}
else if (routingKey == null)
{
@@ -86,11 +94,19 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
if (exchange.hasBindings())
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ NO_BINDINGS, // replyCode
+ null); // replyText
}
}
else
@@ -98,20 +114,29 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
- "Queue " + queueName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_FOUND, // replyCode
+ "Queue " + queueName + " not found"); // replyText
}
else
{
if (exchange.isBound(queue))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
- "Queue " + queueName + " not bound to exchange " +
- exchangeName);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_BOUND, // replyCode
+ "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText
}
}
}
@@ -121,24 +146,30 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
- "Queue " + queueName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_FOUND, // replyCode
+ "Queue " + queueName + " not found"); // replyText
}
else
{
if (exchange.isBound(body.routingKey, queue))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
- null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
- "Queue " + queueName +
- " not bound with routing key " +
- body.routingKey + " to exchange " +
- exchangeName);
+ major, minor, // AMQP version (major, minor)
+ SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ "Queue " + queueName + " not bound with routing key " +
+ body.routingKey + " to exchange " + exchangeName); // replyText
}
}
}
@@ -146,16 +177,20 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
if (exchange.isBound(body.routingKey))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
- null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- NO_QUEUE_BOUND_WITH_RK,
- "No queue bound with routing key " +
- body.routingKey + " to exchange " +
- exchangeName);
+ major, minor, // AMQP version (major, minor)
+ NO_QUEUE_BOUND_WITH_RK, // replyCode
+ "No queue bound with routing key " + body.routingKey +
+ " to exchange " + exchangeName); // replyText
}
}
protocolSession.writeFrame(response);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index b7c75e290a..7937a9bb2d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -75,7 +75,10 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
}
if(!body.nowait)
{
- AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 93ef902190..153a9de4c4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -53,7 +53,10 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
try
{
exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
- AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
catch (ExchangeInUseException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index cf9e40a660..b7fc786981 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -90,7 +90,10 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
if (!body.nowait)
{
- final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index b7004de2a9..83f98de2d9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -102,7 +102,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
if (!body.nowait)
{
- AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0L, // consumerCount
+ 0L, // messageCount
+ body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
protocolSession.writeFrame(response);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 0dbc54f29b..688968b8a0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -81,7 +81,12 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
{
int purged = queue.delete(body.ifUnused, body.ifEmpty);
_store.removeQueue(queue.getName());
- session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ purged)); // messageCount
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index ac864cab6c..7fcad5bbf3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -52,7 +52,10 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
try{
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.commit();
- protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
channel.processReturns(protocolSession);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 475f6ecacf..588dc026d4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -51,7 +51,10 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
try{
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.rollback();
- protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
channel.resend(protocolSession);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index c30bc7d66f..7df3825d8a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -48,6 +48,9 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
- protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 7a9dfbc67c..9ff6b96690 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -165,8 +165,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_minor = pi.protocolMinor;
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
- mechanisms.getBytes(), locales.getBytes());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short)8, // versionMajor
+ (short)0 // versionMinor
+ );
_minaProtocolSession.write(response);
}
catch (AMQException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 18980f440b..2e9590277b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -168,11 +168,20 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
}
else if(throwable instanceof IOException)
{
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
+ _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
}
else
{
- protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ 200, // replyCode
+ throwable.getMessage() // replyText
+ ));
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
protocolSession.close();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index d57f9b9be1..0ceadcb30b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -193,8 +193,16 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public void closeConnection() throws JMException
{
- final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(),
- "Broker Management Console has closing the connection.", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "Broker Management Console has closing the connection." // replyText
+ );
_session.writeFrame(response);
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b27cd807c0..afe4ea95b9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -157,10 +157,20 @@ public class AMQMessage
public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
{
+
AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
- allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, deliveryTag, _redelivered,
- getExchangeName(), getRoutingKey());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ getExchangeName(), // exchange
+ _redelivered, // redelivered
+ getRoutingKey() // routingKey
+ );
allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 2; i < allFrames.length; i++)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 4272541298..78310e8eb3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -379,7 +379,13 @@ public class SubscriptionImpl implements Subscription
if (!_closed)
{
_logger.info("Closing autoclose subscription:" + this);
- protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag // consumerTag
+ ));
_closed = true;
}
}
@@ -392,9 +398,17 @@ public class SubscriptionImpl implements Subscription
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
- deliveryTag, false, exchange,
- routingKey);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ exchange, // exchange
+ false, // redelivered
+ routingKey // routingKey
+ );
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
deliverFrame.writePayload(buf);
buf.flip();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 820b8c3f83..58ac49dd4e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -465,12 +465,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+ ChannelOpenBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null), // outOfBand
+ ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
+ BasicQosBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
BasicQosOkBody.class);
if (transacted)
@@ -479,7 +492,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.debug("Issuing TxSelect for " + channelId);
}
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2136d565f1..8f85aedb1f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -477,7 +477,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// Commits outstanding messages sent and outstanding acknowledgements.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -492,8 +495,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkTransacted();
try
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class);
+ TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -516,8 +522,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
_connection.getProtocolHandler().closeSession(this);
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(
- getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client closing channel"); // replyText
_connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -707,7 +720,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
consumer.clearUnackedMessages();
}
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -1039,7 +1057,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void declareExchangeSynch(String name, String type) throws AMQException
{
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ false, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
_connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
@@ -1050,7 +1081,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
{
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
protocolHandler.writeFrame(exchangeDeclare);
}
@@ -1072,9 +1116,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
amqd.setQueueName(protocolHandler.generateQueueName());
}
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(),
- false, amqd.isDurable(), amqd.isExclusive(),
- amqd.isAutoDelete(), true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ true, // nowait
+ false, // passive
+ amqd.getQueueName(), // queue
+ 0); // ticket
protocolHandler.writeFrame(queueDeclare);
return amqd.getQueueName();
@@ -1082,9 +1136,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0,
- queueName, amqd.getExchangeName(),
- amqd.getRoutingKey(), true, ft);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ft, // arguments
+ amqd.getExchangeName(), // exchange
+ true, // nowait
+ queueName, // queue
+ amqd.getRoutingKey(), // routingKey
+ 0); // ticket
protocolHandler.writeFrame(queueBind);
}
@@ -1122,10 +1184,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
- queueName, tag, consumer.isNoLocal(),
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait, arguments);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ 0); // ticket
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1302,8 +1373,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
try
{
- AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
- false, true);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ 0); // ticket
_connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
catch (AMQException e)
@@ -1389,8 +1468,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isQueueBound(String queueName, String routingKey) throws JMSException
{
- AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- routingKey, queueName);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ queueName, // queue
+ routingKey); // routingKey
AMQMethodEvent response = null;
try
{
@@ -1447,7 +1532,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ deliveryTag, // deliveryTag
+ multiple); // multiple
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
@@ -1606,14 +1697,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void suspendChannel()
{
_logger.warn("Suspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
private void unsuspendChannel()
{
_logger.warn("Unsuspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ true); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index cefaca8d52..1033e827de 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -448,7 +448,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if(sendClose)
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ _consumerTag, // consumerTag
+ false); // nowait
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 7a5fcbccf9..d38e461400 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -134,9 +134,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getExchangeClass(), false,
- false, false, false, true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -512,8 +523,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
AbstractJMSMessage message = convertToNativeMessage(origMessage);
message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getRoutingKey(), mandatory, immediate);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ 0); // ticket
long currentTime = 0;
if (!_disableTimestamps)
@@ -555,7 +574,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
// weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0,
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0,
contentHeaderProperties,
size);
if (_logger.isDebugEnabled())
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index fd2968cdfd..278f0906ea 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -57,7 +57,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
evt.getProtocolSession().writeFrame(frame);
if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index dd9fd651c1..bbfb100b25 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -59,7 +59,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
String reason = method.replyText;
// TODO: check whether channel id of zero is appropriate
- evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)8, (byte)0));
if (errorCode != 200)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
index fd15faf429..153b641a39 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
@@ -54,7 +54,12 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
{
// Evaluate server challenge
byte[] response = client.evaluateChallenge(body.challenge);
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ response); // response
evt.getProtocolSession().writeFrame(responseFrame);
}
catch (SaslException e)
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index cfc3c2898b..8640bbb999 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -126,8 +126,15 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
- ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
- saslResponse, selectedLocale));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ clientProperties, // clientProperties
+ selectedLocale, // locale
+ mechanism, // mechanism
+ saslResponse)); // response
}
catch (UnsupportedEncodingException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index 8fee277392..3592ee4c53 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -72,11 +72,25 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
{
- return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ return ConnectionOpenBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ capabilities, // capabilities
+ insist, // insist
+ path); // virtualHost
}
protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params)
{
- return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ return ConnectionTuneOkBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ params.getChannelMax(), // channelMax
+ params.getFrameMax(), // frameMax
+ params.getHeartbeat()); // heartbeat
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index eab9084717..f37af835e1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -472,8 +472,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
- final AMQFrame frame = ConnectionCloseBody.createAMQFrame(
- 0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client is closing the connection."); // replyText
syncWrite(frame, ConnectionCloseOkBody.class);
_protocolSession.closeProtocolSession();
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index 07d572d27f..5209df59cd 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -112,7 +112,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
private void ping(Broker b) throws AMQException
{
- ClusterPingBody ping = new ClusterPingBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0);
ping.broker = _group.getLocal().getDetails();
ping.responseRequired = true;
ping.load = _loadTable.getLocalLoad();
@@ -158,7 +160,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
Broker leader = connectToLeader(member);
_logger.info(new LogMessage("Connected to {0}. joining", leader));
- ClusterJoinBody join = new ClusterJoinBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0);
join.broker = _group.getLocal().getDetails();
send(leader, new SimpleSendable(join));
}
@@ -177,7 +181,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
public void leave() throws AMQException
{
- ClusterLeaveBody leave = new ClusterLeaveBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0);
leave.broker = _group.getLocal().getDetails();
send(getLeader(), new SimpleSendable(leave));
}
@@ -198,7 +204,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
}
else
{
- ClusterSuspectBody suspect = new ClusterSuspectBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0);
suspect.broker = broker.getDetails();
send(getLeader(), new SimpleSendable(suspect));
}
@@ -220,7 +228,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
else
{
//pass request on to leader:
- ClusterJoinBody request = new ClusterJoinBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0);
request.broker = member.getDetails();
Broker leader = getLeader();
send(leader, new SimpleSendable(request));
@@ -265,7 +275,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
private ClusterMembershipBody createAnnouncement(String membership)
{
- ClusterMembershipBody announce = new ClusterMembershipBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0);
//TODO: revise this way of converting String to bytes...
announce.members = membership.getBytes();
return announce;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
index 0836e9d5fa..93515e42b6 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
@@ -48,7 +48,13 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ evt.getMethod().queue // consumerTag
+ ));
}
else
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
index 3bd9f5d387..832e4830ab 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
@@ -51,7 +51,9 @@ class ConsumerCounts
{
for(String queue : _counts.keySet())
{
- BasicConsumeBody m = new BasicConsumeBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0);
m.queue = queue;
m.consumerTag = queue;
replay(m, messages);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
index 4a00b5cbc3..ce3e71f0a5 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
@@ -44,15 +44,19 @@ import java.util.Arrays;
public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ private final byte major = (byte)8;
+ private final byte minor = (byte)0;
private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
{
- new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody()),
- new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody()),
- new FrameDescriptor(QueueBindBody.class, new QueueBindBody()),
- new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody()),
- new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody()),
- new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody()),
- new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody())
+ new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)),
+ new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor)),
+ new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)),
+ new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)),
+ new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)),
+ new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)),
+ new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor))
});
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index fa737cd1b6..338817e892 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -124,7 +124,9 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
}
}
_consumers.replay(methods);
- methods.add(new ClusterSynchBody());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ methods.add(new ClusterSynchBody((byte)8, (byte)0));
return methods;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index ee16f6062f..8765aebf77 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -75,7 +75,9 @@ public class ClusteredQueue extends AMQQueue
delete();
//send deletion request to all other members:
- QueueDeleteBody request = new QueueDeleteBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0);
request.queue = getName();
_groupMgr.broadcast(new SimpleSendable(request));
}
@@ -87,7 +89,9 @@ public class ClusteredQueue extends AMQQueue
super.unregisterProtocolSession(ps, channel, consumerTag);
//signal other members:
- BasicCancelBody request = new BasicCancelBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0);
request.consumerTag = getName();
_groupMgr.broadcast(new SimpleSendable(request));
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index a3af0fedc7..94f17cb9d3 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
@@ -56,7 +56,9 @@ public class PrivateQueue extends AMQQueue
super.autodelete();
//send delete request to peers:
- QueueDeleteBody request = new QueueDeleteBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0);
request.queue = getName();
_groupMgr.broadcast(new SimpleSendable(request));
}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
index f7fe5dc35a..ed18710c64 100644
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
@@ -148,6 +148,9 @@ public class BrokerTest extends TestCase
TestMethod(Object id)
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ super((byte)8, (byte)0);
this.id = id;
}
diff --git a/java/common/pom.xml b/java/common/pom.xml
index 653b2a8a9d..053fb5fafb 100644
--- a/java/common/pom.xml
+++ b/java/common/pom.xml
@@ -35,14 +35,12 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
- <cluster.asl>${basedir}/src/main/xsl/cluster.asl</cluster.asl>
- <spec.stylesheet>${basedir}/src/main/xsl/framing.xsl</spec.stylesheet>
- <registry.stylesheet>${basedir}/src/main/xsl/registry.xsl</registry.stylesheet>
- <registry.template>${basedir}/src/main/xsl/registry.template</registry.template>
+ <gentools.home>${topDirectoryLocation}/../gentools</gentools.home>
<generated.path>${project.build.directory}/generated-sources/xsl</generated.path>
<generated.package>org/apache/qpid/framing</generated.package>
<generated.dir>${generated.path}/${generated.package}</generated.dir>
<specs.dir>${topDirectoryLocation}/../specs</specs.dir>
+ <cluster.asl>${basedir}/src/main/xsl/cluster.asl</cluster.asl>
</properties>
<build>
@@ -57,13 +55,10 @@
<configuration>
<tasks>
<ant antfile="protocol-version.xml">
- <property name="cluster.asl" value="${cluster.asl}"/>
- <property name="spec.stylesheet" value="${spec.stylesheet}"/>
- <property name="registry.stylesheet" value="${registry.stylesheet}"/>
- <property name="registry.template" value="${registry.template}"/>
+ <property name="gentools.home" value="${gentools.home}"/>
<property name="generated.dir" value="${generated.dir}"/>
- <property name="proto_version" value="${generated.dir}/ProtocolVersionList.java"/>
- <property name="specs.dir" value="${specs.dir}"/>
+ <property name="cluster.asl" value="${cluster.asl}"/>
+ <property name="xml.spec.list" value="${specs.dir}/amqp-8.0.xml ${cluster.asl}"/>
</ant>
</tasks>
<sourceRoot>${generated.path}</sourceRoot>
diff --git a/java/common/protocol-version.xml b/java/common/protocol-version.xml
index 96ce348523..6a92dfbe2b 100644
--- a/java/common/protocol-version.xml
+++ b/java/common/protocol-version.xml
@@ -20,102 +20,21 @@
-->
<project name="Qpid Common Protocol Versions" default="generate">
- <property name="saxon.jar" value="lib/saxon/saxon8.jar"/>
- <!-- temporarily hard-wired XML spec version for build avoidance -->
- <property name="amqp.xml" value="${specs.dir}/amqp-8.0.xml"/>
-
- <macrodef name="saxon">
- <attribute name="out"/>
- <attribute name="src"/>
- <attribute name="xsl"/>
- <element name="args" implicit="true" optional="true"/>
- <sequential>
- <java jar="${saxon.jar}" fork="true">
- <arg value="-o"/>
- <arg value="@{out}"/>
- <arg value="@{src}"/>
- <arg value="@{xsl}"/>
- <args/>
- </java>
- </sequential>
- </macrodef>
-
- <macrodef name="amqp">
- <attribute name="ver"/>
- <sequential>
- <!-- Check for the existence of the AMQP specification file -->
- <property name="amqpspecfile-@{ver}" value="${specs.dir}/amqp-@{ver}.xml"/>
- <available file="${specs.dir}/amqp-@{ver}.xml"
- property="amqpspecfile.present-@{ver}"/>
- <fail unless="amqpspecfile.present-@{ver}"
- message="ERROR: AMQP specification file ${specs.dir}/amqp-@{ver}.xml not found."/>
-
- <!-- Read in the file as a set of properties; extract the amqp version -->
- <xmlproperty prefix="@{ver}" file="${specs.dir}/amqp-@{ver}.xml"/>
- <echo>Found AMQP specification file "${specs.dir}/amqp-@{ver}.xml"; major=${@{ver}.amqp(major)} minor=${@{ver}.amqp(minor)}</echo>
-
- <!-- Add the version to the ProtocolVersionList.java file -->
- <replaceregexp file="${proto_version}" match=" // !VER!"
- replace=",${line.separator} {${@{ver}.amqp(major)}, ${@{ver}.amqp(minor)}} // !VER!"
- flags="s" byline="true"/>
- <replaceregexp file="${proto_version}" match=" // !VER1!"
- replace="{${@{ver}.amqp(major)}, ${@{ver}.amqp(minor)}} // !VER!"
- flags="s" byline="true"/>
-
- <!-- Create directory; generate from specification file -->
- <saxon out="${generated.dir}/results.out"
- src="${specs.dir}/amqp-@{ver}.xml"
- xsl="${spec.stylesheet}">
- <arg value="major=${@{ver}.amqp(major)}"/>
- <arg value="minor=${@{ver}.amqp(minor)}"/>
- <arg value="registry_name=MainRegistry"/>
- </saxon>
- <!-- -->
- <saxon out="${generated.dir}/cluster.out"
- src="${cluster.asl}"
- xsl="${spec.stylesheet}">
- <arg value="major=${@{ver}.amqp(major)}"/>
- <arg value="minor=${@{ver}.amqp(minor)}"/>
- <arg value="registry_name=ClusterRegistry"/>
- </saxon>
- <saxon out="${generated.dir}/registry.out"
- src="${registry.template}"
- xsl="${registry.stylesheet}">
- <arg value="major=${@{ver}.amqp(major)}"/>
- <arg value="minor=${@{ver}.amqp(minor)}"/>
- </saxon>
- </sequential>
- </macrodef>
-
- <uptodate property="generated" targetfile="${generated.dir}/results.out"
- srcfile="${amqp.xml}"/>
-
- <target name="generate" unless="generated">
+<!--
+ <property name="specs.dir" value="../../specs"/>
+ <property name="gentools.home" value="../../gentools"/>
+ <property name="generated.dir" value="target/generated-sources/xsl/org/apache/qpid/framing"/>
+ <property name="cluster.asl" value="src/main/xsl/cluster.asl"/>
+ <property name="xml.spec.list" value="${specs.dir}/amqp-8.0.xml ${cluster.asl}"/>
+-->
+
+ <target name="generate">
<mkdir dir="${generated.dir}"/>
- <copy file="src/main/versions/ProtocolVersionList.java.tmpl" tofile="${proto_version}"
- overwrite="true"/>
- <!--
- NOTE: Set the AMQP version numbers to be supported in this build here.
- The last version in this list will be the version returned when a protocol
- ProtocolInitiation NAK frame is returned by the broker. Usually this is the
- highest or most recent version.
- -->
- <!-- <amqp ver="0.8"/>
- <amqp ver="0.9"/>
- <amqp ver="0.10"/> -->
- <amqp ver="8.0"/>
-
-<!-- <saxon out="${generated.dir}/results.out" src="${amqp.xml}"
- xsl="${stylesheet}">
- <arg value="asl_base=${asl.base}"/>
- <arg value="registry_name=MainRegistry"/>
- </saxon>
- <saxon out="${generated.dir}/cluster.out" src="${cluster.asl}"
- xsl="${stylesheet}">
- <arg value="registry_name=ClusterRegistry"/>
- </saxon>
- <saxon out="${generated.dir}/registry.out" src="${registry_template}"
- xsl="${registry_stylesheet}"/> -->
+ <exec dir="${gentools.home}/src" executable="pwd" />
+ <echo>XML files to be processed: ${xml.spec.list}</echo>
+ <java classname="org.apache.qpid.gentools.Main" fork="true" dir="${gentools.home}/src">
+ <arg line="-j -o ${generated.dir} -t ${gentools.home}/templ.java ${xml.spec.list}"/>
+ </java>
</target>
<target name="precompile" depends="generate"/>
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index 4d604f8c0b..2ead0a03e6 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -27,23 +27,30 @@ public class AMQChannelException extends AMQException
{
private final int _classId;
private final int _methodId;
+ /* AMQP version for which exception ocurred */
+ private final byte major;
+ private final byte minor;
- public AMQChannelException(int errorCode, String msg, int classId, int methodId, Throwable t)
+ public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
{
super(errorCode, msg, t);
_classId = classId;
_methodId = methodId;
+ this.major = major;
+ this.minor = minor;
}
- public AMQChannelException(int errorCode, String msg, int classId, int methodId)
+ public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor)
{
super(errorCode, msg);
_classId = classId;
_methodId = methodId;
+ this.major = major;
+ this.minor = minor;
}
public AMQFrame getCloseFrame(int channel)
{
- return ChannelCloseBody.createAMQFrame(channel, getErrorCode(), getMessage(), _classId, _methodId);
+ return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), getMessage());
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index d829144b11..36287d2923 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -34,5 +34,6 @@ public abstract class AMQBody
protected abstract void writePayload(ByteBuffer buffer);
- protected abstract void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ protected abstract void populateFromBuffer(ByteBuffer buffer, long size)
+ throws AMQFrameDecodingException, AMQProtocolVersionException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 438bfa8d82..2a999fe130 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -81,7 +81,7 @@ public class AMQDataBlockDecoder
}
protected Object createAndPopulateFrame(ByteBuffer in)
- throws AMQFrameDecodingException
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
if (!isSupportedFrameType(type))
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index e75f37d623..6af691fbe8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -62,7 +62,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
* @throws AMQFrameDecodingException
*/
public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory)
- throws AMQFrameDecodingException
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
this.channel = channel;
bodyFrame = bodyFactory.createBody(buffer);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index 6659b4ff8f..5ccc900b2c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -26,6 +26,20 @@ import org.apache.qpid.AMQChannelException;
public abstract class AMQMethodBody extends AMQBody
{
public static final byte TYPE = 1;
+
+ /**
+ * AMQP version
+ */
+ protected byte major;
+ protected byte minor;
+ public byte getMajor() { return major; }
+ public byte getMinor() { return minor; }
+
+ public AMQMethodBody(byte major, byte minor)
+ {
+ this.major = major;
+ this.minor = minor;
+ }
/** unsigned short */
protected abstract int getBodySize();
@@ -80,11 +94,11 @@ public abstract class AMQMethodBody extends AMQBody
*/
public AMQChannelException getChannelException(int code, String message)
{
- return new AMQChannelException(code, message, getClazz(), getMethod());
+ return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor);
}
public AMQChannelException getChannelException(int code, String message, Throwable cause)
{
- return new AMQChannelException(code, message, getClazz(), getMethod(), cause);
+ return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
index 107af67dc7..da0909d32f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -41,6 +41,11 @@ public class AMQMethodBodyFactory implements BodyFactory
public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
{
- return MethodBodyDecoderRegistry.get(in.getUnsignedShort(), in.getUnsignedShort());
+ // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML
+ // segments generated together are now handled by MainRegistry. The Cluster class,
+ // if generated together with amqp.xml is a part of MainRegistry.
+ // TODO: Connect with version acquired from ProtocolInitiation class.
+ return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(),
+ (byte)8, (byte)0);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 61837f65cc..fc80d93f82 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -245,7 +245,7 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
}
public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
- throws AMQFrameDecodingException
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
_propertyFlags = propertyFlags;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index a59869b1d8..4ee36ee831 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -58,7 +58,8 @@ public class ContentHeaderBody extends AMQBody
return TYPE;
}
- protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ protected void populateFromBuffer(ByteBuffer buffer, long size)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
classId = buffer.getUnsignedShort();
weight = buffer.getUnsignedShort();
@@ -75,7 +76,8 @@ public class ContentHeaderBody extends AMQBody
* @return
* @throws AMQFrameDecodingException
*/
- public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
ContentHeaderBody body = new ContentHeaderBody();
body.populateFromBuffer(buffer, size);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
index 561d7852fd..88bdefca88 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
@@ -41,7 +41,7 @@ public interface ContentHeaderProperties
* @throws AMQFrameDecodingException when the buffer does not contain valid data
*/
void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
- throws AMQFrameDecodingException;
+ throws AMQFrameDecodingException, AMQProtocolVersionException;
/**
* @return the size of the encoded property list in bytes.
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
index cec413cb9d..cfcc5db857 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -37,16 +37,19 @@ public class ContentHeaderPropertiesFactory
public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
ByteBuffer buffer, int size)
- throws AMQFrameDecodingException
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
ContentHeaderProperties properties;
- switch (classId)
+ // AMQP version change: "Hardwired" version to major=8, minor=0
+ // TODO: Change so that the actual version is obtained from
+ // the ProtocolInitiation object for this session.
+ if (classId == BasicConsumeBody.getClazz((byte)8, (byte)0))
{
- case BasicConsumeBody.CLASS_ID:
- properties = new BasicContentHeaderProperties();
- break;
- default:
- throw new AMQFrameDecodingException("Unsupport content header class id: " + classId);
+ properties = new BasicContentHeaderProperties();
+ }
+ else
+ {
+ throw new AMQFrameDecodingException("Unsupport content header class id: " + classId);
}
properties.populatePropertiesFromBuffer(buffer, propertyFlags, size);
return properties;
diff --git a/java/common/src/main/xsl/cluster.asl b/java/common/src/main/xsl/cluster.asl
index 40ca937904..09e8ca0787 100644
--- a/java/common/src/main/xsl/cluster.asl
+++ b/java/common/src/main/xsl/cluster.asl
@@ -29,26 +29,26 @@
provide a clustered service to clients.
</doc>
-<method name = "join">
+<method name = "join" index="10">
<field name = "broker" type = "shortstr" />
</method>
-<method name = "membership">
+<method name = "membership" index="20">
<field name = "members" type = "longstr" />
</method>
-<method name = "synch">
+<method name = "synch" index="30">
</method>
-<method name = "leave">
+<method name = "leave" index="40">
<field name = "broker" type = "shortstr" />
</method>
-<method name = "suspect">
+<method name = "suspect" index="50">
<field name = "broker" type = "shortstr" />
</method>
-<method name = "ping">
+<method name = "ping" index="60">
<field name = "broker" type = "shortstr" />
<field name = "load" type = "long" />
<field name = "response required" type = "bit" />
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index bcf1602433..23f7f3d53b 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -140,7 +140,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
static BasicPublishBody getPublishRequest(String id)
{
- BasicPublishBody request = new BasicPublishBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Establish some way to determine the version for the test.
+ BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0);
request.routingKey = id;
return request;
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 8de3c8bf33..fafb87abd5 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -148,7 +148,9 @@ public class AMQQueueMBeanTest extends TestCase
private AMQMessage message(boolean immediate) throws AMQException
{
- BasicPublishBody publish = new BasicPublishBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Establish some way to determine the version for the test.
+ BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
publish.immediate = immediate;
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = 1000; // in bytes
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
index c2511f0a99..11bae0d9f6 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -80,7 +80,9 @@ public class AckTest extends TestCase
{
for (int i = 1; i <= count; i++)
{
- BasicPublishBody publishBody = new BasicPublishBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Establish some way to determine the version for the test.
+ BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0);
publishBody.routingKey = "rk";
publishBody.exchange = "someExchange";
AMQMessage msg = new AMQMessage(_messageStore, publishBody);
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
index 8570e6521f..6b764acd54 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -46,7 +46,9 @@ class MessageTestHelper extends TestCase
AMQMessage message(boolean immediate) throws AMQException
{
- BasicPublishBody publish = new BasicPublishBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Establish some way to determine the version for the test.
+ BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
publish.immediate = immediate;
return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), null);
}