summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2006-12-22 17:00:28 +0000
committerKim van der Riet <kpvdr@apache.org>2006-12-22 17:00:28 +0000
commit5129ac060aed57d8e31a62c3cd64ff0ad8995949 (patch)
tree12616b7ae0cc57d7c3fb88025fd05cf31686d51a /java/client/src
parent142d35580b326c99a306f6476ff0a0b723db920e (diff)
downloadqpid-python-5129ac060aed57d8e31a62c3cd64ff0ad8995949.tar.gz
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489691 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java149
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java11
10 files changed, 226 insertions, 43 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 820b8c3f83..58ac49dd4e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -465,12 +465,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+ ChannelOpenBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null), // outOfBand
+ ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
+ BasicQosBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
BasicQosOkBody.class);
if (transacted)
@@ -479,7 +492,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.debug("Issuing TxSelect for " + channelId);
}
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2136d565f1..8f85aedb1f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -477,7 +477,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// Commits outstanding messages sent and outstanding acknowledgements.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -492,8 +495,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkTransacted();
try
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class);
+ TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -516,8 +522,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
_connection.getProtocolHandler().closeSession(this);
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(
- getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client closing channel"); // replyText
_connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -707,7 +720,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
consumer.clearUnackedMessages();
}
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -1039,7 +1057,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void declareExchangeSynch(String name, String type) throws AMQException
{
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ false, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
_connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
@@ -1050,7 +1081,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
{
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
protocolHandler.writeFrame(exchangeDeclare);
}
@@ -1072,9 +1116,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
amqd.setQueueName(protocolHandler.generateQueueName());
}
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(),
- false, amqd.isDurable(), amqd.isExclusive(),
- amqd.isAutoDelete(), true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ true, // nowait
+ false, // passive
+ amqd.getQueueName(), // queue
+ 0); // ticket
protocolHandler.writeFrame(queueDeclare);
return amqd.getQueueName();
@@ -1082,9 +1136,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0,
- queueName, amqd.getExchangeName(),
- amqd.getRoutingKey(), true, ft);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ft, // arguments
+ amqd.getExchangeName(), // exchange
+ true, // nowait
+ queueName, // queue
+ amqd.getRoutingKey(), // routingKey
+ 0); // ticket
protocolHandler.writeFrame(queueBind);
}
@@ -1122,10 +1184,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
- queueName, tag, consumer.isNoLocal(),
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait, arguments);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ 0); // ticket
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1302,8 +1373,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
try
{
- AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
- false, true);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ 0); // ticket
_connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
catch (AMQException e)
@@ -1389,8 +1468,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isQueueBound(String queueName, String routingKey) throws JMSException
{
- AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- routingKey, queueName);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ queueName, // queue
+ routingKey); // routingKey
AMQMethodEvent response = null;
try
{
@@ -1447,7 +1532,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ deliveryTag, // deliveryTag
+ multiple); // multiple
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
@@ -1606,14 +1697,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void suspendChannel()
{
_logger.warn("Suspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
private void unsuspendChannel()
{
_logger.warn("Unsuspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ true); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index cefaca8d52..1033e827de 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -448,7 +448,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if(sendClose)
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ _consumerTag, // consumerTag
+ false); // nowait
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 7a5fcbccf9..d38e461400 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -134,9 +134,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getExchangeClass(), false,
- false, false, false, true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -512,8 +523,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
AbstractJMSMessage message = convertToNativeMessage(origMessage);
message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getRoutingKey(), mandatory, immediate);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ 0); // ticket
long currentTime = 0;
if (!_disableTimestamps)
@@ -555,7 +574,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
// weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0,
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0,
contentHeaderProperties,
size);
if (_logger.isDebugEnabled())
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index fd2968cdfd..278f0906ea 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -57,7 +57,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
evt.getProtocolSession().writeFrame(frame);
if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index dd9fd651c1..bbfb100b25 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -59,7 +59,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
String reason = method.replyText;
// TODO: check whether channel id of zero is appropriate
- evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)8, (byte)0));
if (errorCode != 200)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
index fd15faf429..153b641a39 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
@@ -54,7 +54,12 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
{
// Evaluate server challenge
byte[] response = client.evaluateChallenge(body.challenge);
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ response); // response
evt.getProtocolSession().writeFrame(responseFrame);
}
catch (SaslException e)
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index cfc3c2898b..8640bbb999 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -126,8 +126,15 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
- ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
- saslResponse, selectedLocale));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ clientProperties, // clientProperties
+ selectedLocale, // locale
+ mechanism, // mechanism
+ saslResponse)); // response
}
catch (UnsupportedEncodingException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index 8fee277392..3592ee4c53 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -72,11 +72,25 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
{
- return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ return ConnectionOpenBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ capabilities, // capabilities
+ insist, // insist
+ path); // virtualHost
}
protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params)
{
- return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ return ConnectionTuneOkBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ params.getChannelMax(), // channelMax
+ params.getFrameMax(), // frameMax
+ params.getHeartbeat()); // heartbeat
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index eab9084717..f37af835e1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -472,8 +472,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
- final AMQFrame frame = ConnectionCloseBody.createAMQFrame(
- 0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client is closing the connection."); // replyText
syncWrite(frame, ConnectionCloseOkBody.class);
_protocolSession.closeProtocolSession();