diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-25 00:30:16 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-25 00:30:16 +0000 |
commit | e18eda8c992e8042f76ccb8bf2fed4b72489c44d (patch) | |
tree | 93347bab75bce78ed87177cae1f0594ac671635b | |
parent | 0af505cd8d90e903aedb131492f948031ac062e7 (diff) | |
download | qpid-python-e18eda8c992e8042f76ccb8bf2fed4b72489c44d.tar.gz |
QPID-318 : Patch supplied by Rob Godfrey - Remove hard-coding of protocol version number.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499628 13f79535-47bb-0310-9956-ffa450edef68
12 files changed, 251 insertions, 177 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 51b585ecc5..b8db7371b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -72,7 +72,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB // TODO - set clusterId
- session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 0, null));
+ session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, body.getMajor(), body.getMinor(), null));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 8cc747200f..d87821aa46 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -539,17 +539,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * NOTE: Both major and minor will be set to 0 prior to protocol initiation. */ - public byte getAmqpMajor() + public byte getProtocolMajorVersion() { return _major; } - public byte getAmqpMinor() + public byte getProtocolMinorVersion() { return _minor; } - public boolean amqpVersionEquals(byte major, byte minor) + public boolean isProtocolVersion(byte major, byte minor) { return _major == major && _minor == minor; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 48c05058b0..ed998b33c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -35,6 +35,7 @@ public interface AMQProtocolSession extends AMQProtocolWriter { + public static interface Task { public void doTask(AMQProtocolSession session) throws AMQException; @@ -143,4 +144,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter void removeSessionCloseTask(Task task); + byte getProtocolMajorVersion(); + + byte getProtocolMinorVersion(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index c227cd5094..23a5da0a30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -541,7 +541,7 @@ public class AMQMessage public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { - ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag); + ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); @@ -585,7 +585,7 @@ public class AMQMessage public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException { - ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, queueSize); + ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); @@ -627,11 +627,11 @@ public class AMQMessage } - private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag) + private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { BasicPublishBody pb = getPublishBody(); - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, (byte) 8, (byte) 0, consumerTag, + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, deliveryTag, pb.exchange, _messageHandle.isRedelivered(), pb.routingKey); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? @@ -640,11 +640,13 @@ public class AMQMessage return buf; } - private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize) + private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException { BasicPublishBody pb = getPublishBody(); - AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 8, (byte) 0, + AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), deliveryTag, pb.exchange, queueSize, _messageHandle.isRedelivered(), @@ -655,9 +657,12 @@ public class AMQMessage return buf; } - private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException + private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException { - AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange, + AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + getPublishBody().exchange, replyCode, replyText, getPublishBody().routingKey); ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? @@ -669,7 +674,7 @@ public class AMQMessage public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException { - ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText); + ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 261efd4f3b..cc052f81df 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -480,22 +480,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + + // TODO: Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite( ChannelOpenBody.createAMQFrame(channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), null), // outOfBand ChannelOpenOkBody.class); //todo send low water mark when protocol allows. - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + //todo Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( BasicQosBody.createAMQFrame(channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), false, // global prefetchHigh, // prefetchCount 0), // prefetchSize @@ -507,10 +507,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.debug("Issuing TxSelect for " + channelId); } - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte) 8, (byte) 0), TxSelectOkBody.class); + + // TODO: Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, + _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion()), + TxSelectOkBody.class); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a7134736f0..4fd21b5480 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -556,10 +556,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Commits outstanding messages sent and outstanding acknowledgements. - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxCommitOkBody.class); + // TODO: Be aware of possible changes to parameter order as versions change. + final AMQProtocolHandler handler = getProtocolHandler(); + + handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion()), + TxCommitOkBody.class); } catch (AMQException e) { @@ -569,16 +572,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public void rollback() throws JMSException { checkTransacted(); try { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class); + // TODO: Be aware of possible changes to parameter order as versions change. + getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); } catch (AMQException e) { @@ -605,17 +607,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - _connection.getProtocolHandler().closeSession(this); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + + getProtocolHandler().closeSession(this); + // TODO: Be aware of possible changes to parameter order as versions change. final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) 0, // classId 0, // methodId AMQConstant.REPLY_SUCCESS.getCode(), // replyCode new AMQShortString("JMS client closing channel")); // replyText - _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); + getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -634,6 +635,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + private AMQProtocolHandler getProtocolHandler() + { + return _connection.getProtocolHandler(); + } + + + private byte getProtocolMinorVersion() + { + return getProtocolHandler().getProtocolMinorVersion(); + } + + private byte getProtocolMajorVersion() + { + return getProtocolHandler().getProtocolMajorVersion(); + } + + /** * Close all producers or consumers. This is called either in the error case or when closing the session normally. * @@ -818,11 +836,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.clearUnackedMessages(); } - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + // TODO: Be aware of possible changes to parameter order as versions change. + getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) false)); // requeue } @@ -934,7 +950,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotClosed(); long producerId = getNextProducerId(); BasicMessageProducer producer = new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, - AMQSession.this, _connection.getProtocolHandler(), + AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); registerProducer(producerId, producer); return producer; @@ -1102,7 +1118,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQDestination amqd = (AMQDestination) destination; - final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler(); + final AMQProtocolHandler protocolHandler = getProtocolHandler(); // TODO: construct the rawSelector from the selector string if rawSelector == null final FieldTable ft = FieldTableFactory.newFieldTable(); //if (rawSelector != null) @@ -1183,16 +1199,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void declareExchange(AMQShortString name, AMQShortString type) { - declareExchange(name, type, _connection.getProtocolHandler()); + declareExchange(name, type, getProtocolHandler()); } public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) null, // arguments false, // autoDelete false, // durable @@ -1202,7 +1216,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // passive 0, // ticket type); // type - _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); + getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); } private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) @@ -1212,11 +1226,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) null, // arguments false, // autoDelete false, // durable @@ -1247,11 +1259,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.setQueueName(protocolHandler.generateQueueName()); } - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) null, // arguments amqd.isAutoDelete(), // autoDelete amqd.isDurable(), // durable @@ -1267,11 +1277,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) ft, // arguments amqd.getExchangeName(), // exchange true, // nowait @@ -1315,11 +1323,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) arguments, // arguments tag, // consumerTag consumer.isExclusive(), // exclusive @@ -1513,17 +1519,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { try { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) false, // ifEmpty false, // ifUnused true, // nowait queueName, // queue 0); // ticket - _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } catch (AMQException e) { @@ -1608,18 +1612,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange queueName, // queue routingKey); // routingKey AMQMethodEvent response = null; try { - response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); } catch (AMQException e) { @@ -1672,18 +1674,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) deliveryTag, // deliveryTag multiple); // multiple if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } - _connection.getProtocolHandler().writeFrame(ackFrame); + getProtocolHandler().writeFrame(ackFrame); } public int getDefaultPrefetch() @@ -1742,7 +1742,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { AMQDestination amqd = consumer.getDestination(); - AMQProtocolHandler protocolHandler = _connection.getProtocolHandler(); + AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler); @@ -1839,25 +1839,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void suspendChannel() { _logger.warn("Suspending channel"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) false); // active - _connection.getProtocolHandler().writeFrame(channelFlowFrame); + getProtocolHandler().writeFrame(channelFlowFrame); } private void unsuspendChannel() { _logger.warn("Unsuspending channel"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) true); // active - _connection.getProtocolHandler().writeFrame(channelFlowFrame); + getProtocolHandler().writeFrame(channelFlowFrame); } public void confirmConsumerCancelled(AMQShortString consumerTag) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 815cadb74d..e0d7db61cf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -467,11 +467,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (sendClose) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. + // TODO: Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) + _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag false); // nowait diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java index da903e7c1d..090c77165d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ConnectionOpenOkBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index d7fc86a5b0..477a679b90 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -60,82 +60,117 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener { ConnectionStartBody body = (ConnectionStartBody) evt.getMethod(); - try + byte major = (byte) body.versionMajor; + byte minor = (byte) body.versionMinor; + + if(checkVersionOK(major, minor)) { - // the mechanism we are going to use - String mechanism; - if (body.mechanisms == null) - { - throw new AMQException("mechanism not specified in ConnectionStart method frame"); - } - else - { - mechanism = chooseMechanism(body.mechanisms); - } - if (mechanism == null) - { - throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms)); - } + protocolSession.setProtocolVersion(major, minor); + - byte[] saslResponse; try { - SaslClient sc = Sasl.createSaslClient(new String[]{mechanism}, - null, "AMQP", "localhost", - null,createCallbackHandler(mechanism, protocolSession)); - if (sc == null) + // the mechanism we are going to use + String mechanism; + if (body.mechanisms == null) { - throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " + - mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " + - " details of how to register non-standard SASL client providers."); + throw new AMQException("mechanism not specified in ConnectionStart method frame"); + } + else + { + mechanism = chooseMechanism(body.mechanisms); } - protocolSession.setSaslClient(sc); - saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null); - } - catch (SaslException e) - { - protocolSession.setSaslClient(null); - throw new AMQException("Unable to create SASL client: " + e, e); - } - if (body.locales == null) - { - throw new AMQException("Locales is not defined in Connection Start method"); - } - final String locales = new String(body.locales, "utf8"); - final StringTokenizer tokenizer = new StringTokenizer(locales, " "); - String selectedLocale = null; - if (tokenizer.hasMoreTokens()) - { - selectedLocale = tokenizer.nextToken(); + if (mechanism == null) + { + throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms)); + } + + byte[] saslResponse; + try + { + SaslClient sc = Sasl.createSaslClient(new String[]{mechanism}, + null, "AMQP", "localhost", + null,createCallbackHandler(mechanism, protocolSession)); + if (sc == null) + { + throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " + + mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " + + " details of how to register non-standard SASL client providers."); + } + protocolSession.setSaslClient(sc); + saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null); + } + catch (SaslException e) + { + protocolSession.setSaslClient(null); + throw new AMQException("Unable to create SASL client: " + e, e); + } + + if (body.locales == null) + { + throw new AMQException("Locales is not defined in Connection Start method"); + } + final String locales = new String(body.locales, "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + String selectedLocale = null; + if (tokenizer.hasMoreTokens()) + { + selectedLocale = tokenizer.nextToken(); + } + else + { + throw new AMQException("No locales sent from server, passed: " + locales); + } + + stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + FieldTable clientProperties = FieldTableFactory.newFieldTable(); + + clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID()); + clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName()); + clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion()); + clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + clientProperties, // clientProperties + new AMQShortString(selectedLocale), // locale + new AMQShortString(mechanism), // mechanism + saslResponse)); // response } - else + catch (UnsupportedEncodingException e) { - throw new AMQException("No locales sent from server, passed: " + locales); + throw new AMQException(_log, "Unable to decode data: " + e, e); } + } + else + { + _log.error("Broker requested Protocol [" + + body.versionMajor + + "-" + + body.versionMinor + + "] which is not supported by this version of the client library"); - stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - FieldTable clientProperties = FieldTableFactory.newFieldTable(); - - clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID()); - clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName()); - clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion()); - clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo()); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - clientProperties, // clientProperties - new AMQShortString(selectedLocale), // locale - new AMQShortString(mechanism), // mechanism - saslResponse)); // response + protocolSession.closeProtocolSession(); } - catch (UnsupportedEncodingException e) + } + + private boolean checkVersionOK(byte versionMajor, byte versionMinor) + { + byte[][] supportedVersions = ProtocolVersionList.pv; + boolean supported = false; + int i = supportedVersions.length; + while(i-- != 0 && !supported) { - throw new AMQException(_log, "Unable to decode data: " + e, e); + supported = (supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor) + && (supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor); } + + return supported; } private String getFullSystemInfo() diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index fbf195d20e..4e7f8a3032 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -95,21 +95,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQProtocolHandler(AMQConnection con) { _connection = con; - - // We add a proxy for the state manager so that we can substitute the state manager easily in this class. - // We substitute the state manager when performing failover -/* _frameListeners.add(new AMQMethodListener() - { - public boolean methodReceived(AMQMethodEvent evt) throws AMQException - { - return _stateManager.methodReceived(evt); - } - - public void error(Exception e) - { - _stateManager.error(e); - } - });*/ } public boolean isUseSSL() @@ -152,7 +137,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void sessionOpened(IoSession session) throws Exception { - System.setProperty("foo", "bar"); + //System.setProperty("foo", "bar"); } /** @@ -526,7 +511,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, - (byte) 8, (byte) 0, // AMQP version (major, minor) + _protocolSession.getProtocolMajorVersion(), + _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) 0, // classId 0, // methodId AMQConstant.REPLY_SUCCESS.getCode(), // replyCode @@ -622,4 +608,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _failoverState = failoverState; } + + public byte getProtocolMajorVersion() + { + return _protocolSession.getProtocolMajorVersion(); + } + + + public byte getProtocolMinorVersion() + { + return _protocolSession.getProtocolMinorVersion(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index b6dd05d761..2399819a07 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -93,6 +93,12 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis protected int _queueId = 1; protected final Object _queueIdLock = new Object(); + private byte _protocolMinorVersion; + private byte _protocolMajorVersion; + + + + /** * No-arg constructor for use by test subclass - has to initialise final vars * NOT intended for use other then for test @@ -458,4 +464,22 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis session.confirmConsumerCancelled(consumerTag); } + + public void setProtocolVersion(byte versionMajor, byte versionMinor) + { + _protocolMajorVersion = versionMajor; + _protocolMinorVersion = versionMinor; + + } + + public byte getProtocolMinorVersion() + { + return _protocolMinorVersion; + } + + public byte getProtocolMajorVersion() + { + return _protocolMajorVersion; + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index 3f371161c6..0cfa4eddce 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -158,4 +158,14 @@ public class MockProtocolSession implements AMQProtocolSession { //To change body of implemented methods use File | Settings | File Templates. } + + public byte getProtocolMajorVersion() + { + return 8; //To change body of implemented methods use File | Settings | File Templates. + } + + public byte getProtocolMinorVersion() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } } |