diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2006-12-22 17:00:28 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2006-12-22 17:00:28 +0000 |
| commit | 5129ac060aed57d8e31a62c3cd64ff0ad8995949 (patch) | |
| tree | 12616b7ae0cc57d7c3fb88025fd05cf31686d51a /java/client/src | |
| parent | 142d35580b326c99a306f6476ff0a0b723db920e (diff) | |
| download | qpid-python-5129ac060aed57d8e31a62c3cd64ff0ad8995949.tar.gz | |
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489691 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
10 files changed, 226 insertions, 43 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 820b8c3f83..58ac49dd4e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -465,12 +465,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class); + ChannelOpenBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null), // outOfBand + ChannelOpenOkBody.class); //todo send low water mark when protocol allows. + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false), + BasicQosBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + false, // global + prefetchHigh, // prefetchCount + 0), // prefetchSize BasicQosOkBody.class); if (transacted) @@ -479,7 +492,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.debug("Issuing TxSelect for " + channelId); } - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2136d565f1..8f85aedb1f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -477,7 +477,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Commits outstanding messages sent and outstanding acknowledgements. - _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class); } catch (AMQException e) { @@ -492,8 +495,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkTransacted(); try { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class); + TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class); } catch (AMQException e) { @@ -516,8 +522,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { _connection.getProtocolHandler().closeSession(this); - final AMQFrame frame = ChannelCloseBody.createAMQFrame( - getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "JMS client closing channel"); // replyText _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -707,7 +720,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.clearUnackedMessages(); } - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -1039,7 +1057,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void declareExchangeSynch(String name, String type) throws AMQException { - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + false, // nowait + false, // passive + 0, // ticket + type); // type _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); } @@ -1050,7 +1081,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + true, // nowait + false, // passive + 0, // ticket + type); // type protocolHandler.writeFrame(exchangeDeclare); } @@ -1072,9 +1116,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.setQueueName(protocolHandler.generateQueueName()); } - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(), - false, amqd.isDurable(), amqd.isExclusive(), - amqd.isAutoDelete(), true, null); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + true, // nowait + false, // passive + amqd.getQueueName(), // queue + 0); // ticket protocolHandler.writeFrame(queueDeclare); return amqd.getQueueName(); @@ -1082,9 +1136,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException { - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0, - queueName, amqd.getExchangeName(), - amqd.getRoutingKey(), true, ft); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + ft, // arguments + amqd.getExchangeName(), // exchange + true, // nowait + queueName, // queue + amqd.getRoutingKey(), // routingKey + 0); // ticket protocolHandler.writeFrame(queueBind); } @@ -1122,10 +1184,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, - queueName, tag, consumer.isNoLocal(), - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, - consumer.isExclusive(), nowait, arguments); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + 0); // ticket if (nowait) { protocolHandler.writeFrame(jmsConsume); @@ -1302,8 +1373,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { try { - AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false, - false, true); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + 0); // ticket _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } catch (AMQException e) @@ -1389,8 +1468,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isQueueBound(String queueName, String routingKey) throws JMSException { - AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME, - routingKey, queueName); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange + queueName, // queue + routingKey); // routingKey AMQMethodEvent response = null; try { @@ -1447,7 +1532,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + deliveryTag, // deliveryTag + multiple); // multiple if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); @@ -1606,14 +1697,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void suspendChannel() { _logger.warn("Suspending channel"); - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + false); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } private void unsuspendChannel() { _logger.warn("Unsuspending channel"); - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + true); // active _connection.getProtocolHandler().writeFrame(channelFlowFrame); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index cefaca8d52..1033e827de 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -448,7 +448,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if(sendClose) { - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + _consumerTag, // consumerTag + false); // nowait try { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 7a5fcbccf9..d38e461400 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -134,9 +134,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false - AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), - destination.getExchangeClass(), false, - false, false, false, true, null); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + null, // arguments + false, // autoDelete + false, // durable + destination.getExchangeName(), // exchange + false, // internal + true, // nowait + false, // passive + 0, // ticket + destination.getExchangeClass()); // type _protocolHandler.writeFrame(declare); } @@ -512,8 +523,16 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j AbstractJMSMessage message = convertToNativeMessage(origMessage); message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); - AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), - destination.getRoutingKey(), mandatory, immediate); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + destination.getExchangeName(), // exchange + immediate, // immediate + mandatory, // mandatory + destination.getRoutingKey(), // routingKey + 0); // ticket long currentTime = 0; if (!_disableTimestamps) @@ -555,7 +574,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } // weight argument of zero indicates no child content headers, just bodies - AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0, + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0, contentHeaderProperties, size); if (_logger.isDebugEnabled()) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index fd2968cdfd..278f0906ea 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -57,7 +57,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); } - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); evt.getProtocolSession().writeFrame(frame); if (errorCode != AMQConstant.REPLY_SUCCESS.getCode()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index dd9fd651c1..bbfb100b25 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -59,7 +59,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener String reason = method.replyText; // TODO: check whether channel id of zero is appropriate - evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)8, (byte)0)); if (errorCode != 200) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java index fd15faf429..153b641a39 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java @@ -54,7 +54,12 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener { // Evaluate server challenge byte[] response = client.evaluateChallenge(body.challenge); - AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + response); // response evt.getProtocolSession().writeFrame(responseFrame); } catch (SaslException e) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index cfc3c2898b..8640bbb999 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -126,8 +126,15 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName()); clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion()); clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo()); - ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism, - saslResponse, selectedLocale)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + clientProperties, // clientProperties + selectedLocale, // locale + mechanism, // mechanism + saslResponse)); // response } catch (UnsupportedEncodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index 8fee277392..3592ee4c53 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -72,11 +72,25 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) { - return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + return ConnectionOpenBody.createAMQFrame(channel, + (byte)8, (byte)0, // AMQP version (major, minor) + capabilities, // capabilities + insist, // insist + path); // virtualHost } protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params) { - return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + return ConnectionTuneOkBody.createAMQFrame(channel, + (byte)8, (byte)0, // AMQP version (major, minor) + params.getChannelMax(), // channelMax + params.getFrameMax(), // frameMax + params.getHeartbeat()); // heartbeat } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index eab9084717..f37af835e1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -472,8 +472,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _stateManager.changeState(AMQState.CONNECTION_CLOSING); - final AMQFrame frame = ConnectionCloseBody.createAMQFrame( - 0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "JMS client is closing the connection."); // replyText syncWrite(frame, ConnectionCloseOkBody.class); _protocolSession.closeProtocolSession(); |
