summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-25 00:30:16 +0000
committerRobert Greig <rgreig@apache.org>2007-01-25 00:30:16 +0000
commite18eda8c992e8042f76ccb8bf2fed4b72489c44d (patch)
tree93347bab75bce78ed87177cae1f0594ac671635b
parent0af505cd8d90e903aedb131492f948031ac062e7 (diff)
downloadqpid-python-e18eda8c992e8042f76ccb8bf2fed4b72489c44d.tar.gz
QPID-318 : Patch supplied by Rob Godfrey - Remove hard-coding of protocol version number.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499628 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java136
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java157
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java24
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java10
12 files changed, 251 insertions, 177 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 51b585ecc5..b8db7371b0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -72,7 +72,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
// TODO - set clusterId
- session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 0, null));
+ session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, body.getMajor(), body.getMinor(), null));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 8cc747200f..d87821aa46 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -539,17 +539,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* NOTE: Both major and minor will be set to 0 prior to protocol initiation.
*/
- public byte getAmqpMajor()
+ public byte getProtocolMajorVersion()
{
return _major;
}
- public byte getAmqpMinor()
+ public byte getProtocolMinorVersion()
{
return _minor;
}
- public boolean amqpVersionEquals(byte major, byte minor)
+ public boolean isProtocolVersion(byte major, byte minor)
{
return _major == major && _minor == minor;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 48c05058b0..ed998b33c6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -35,6 +35,7 @@ public interface AMQProtocolSession extends AMQProtocolWriter
{
+
public static interface Task
{
public void doTask(AMQProtocolSession session) throws AMQException;
@@ -143,4 +144,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter
void removeSessionCloseTask(Task task);
+ byte getProtocolMajorVersion();
+
+ byte getProtocolMinorVersion();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index c227cd5094..23a5da0a30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -541,7 +541,7 @@ public class AMQMessage
public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag);
+ ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
@@ -585,7 +585,7 @@ public class AMQMessage
public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
{
- ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, queueSize);
+ ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
@@ -627,11 +627,11 @@ public class AMQMessage
}
- private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag)
+ private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
BasicPublishBody pb = getPublishBody();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, (byte) 8, (byte) 0, consumerTag,
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
deliveryTag, pb.exchange, _messageHandle.isRedelivered(),
pb.routingKey);
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
@@ -640,11 +640,13 @@ public class AMQMessage
return buf;
}
- private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize)
+ private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
BasicPublishBody pb = getPublishBody();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 8, (byte) 0,
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
deliveryTag, pb.exchange,
queueSize,
_messageHandle.isRedelivered(),
@@ -655,9 +657,12 @@ public class AMQMessage
return buf;
}
- private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ getPublishBody().exchange,
replyCode, replyText,
getPublishBody().routingKey);
ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
@@ -669,7 +674,7 @@ public class AMQMessage
public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText);
+ ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 261efd4f3b..cc052f81df 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -480,22 +480,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+
+ // TODO: Be aware of possible changes to parameter order as versions change.
+
_protocolHandler.syncWrite(
ChannelOpenBody.createAMQFrame(channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(),
null), // outOfBand
ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ //todo Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
BasicQosBody.createAMQFrame(channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(),
false, // global
prefetchHigh, // prefetchCount
0), // prefetchSize
@@ -507,10 +507,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.debug("Issuing TxSelect for " + channelId);
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte) 8, (byte) 0), TxSelectOkBody.class);
+
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
+ _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion()),
+ TxSelectOkBody.class);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a7134736f0..4fd21b5480 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -556,10 +556,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// Commits outstanding messages sent and outstanding acknowledgements.
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxCommitOkBody.class);
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ final AMQProtocolHandler handler = getProtocolHandler();
+
+ handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion()),
+ TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -569,16 +572,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+
public void rollback() throws JMSException
{
checkTransacted();
try
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class);
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().syncWrite(
+ TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -605,17 +607,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- _connection.getProtocolHandler().closeSession(this);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+
+ getProtocolHandler().closeSession(this);
+ // TODO: Be aware of possible changes to parameter order as versions change.
final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
new AMQShortString("JMS client closing channel")); // replyText
- _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
+ getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -634,6 +635,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ private AMQProtocolHandler getProtocolHandler()
+ {
+ return _connection.getProtocolHandler();
+ }
+
+
+ private byte getProtocolMinorVersion()
+ {
+ return getProtocolHandler().getProtocolMinorVersion();
+ }
+
+ private byte getProtocolMajorVersion()
+ {
+ return getProtocolHandler().getProtocolMajorVersion();
+ }
+
+
/**
* Close all producers or consumers. This is called either in the error case or when closing the session normally.
*
@@ -818,11 +836,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
consumer.clearUnackedMessages();
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
false)); // requeue
}
@@ -934,7 +950,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotClosed();
long producerId = getNextProducerId();
BasicMessageProducer producer = new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
- AMQSession.this, _connection.getProtocolHandler(),
+ AMQSession.this, getProtocolHandler(),
producerId, immediate, mandatory, waitUntilSent);
registerProducer(producerId, producer);
return producer;
@@ -1102,7 +1118,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQDestination amqd = (AMQDestination) destination;
- final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
// TODO: construct the rawSelector from the selector string if rawSelector == null
final FieldTable ft = FieldTableFactory.newFieldTable();
//if (rawSelector != null)
@@ -1183,16 +1199,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void declareExchange(AMQShortString name, AMQShortString type)
{
- declareExchange(name, type, _connection.getProtocolHandler());
+ declareExchange(name, type, getProtocolHandler());
}
public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
null, // arguments
false, // autoDelete
false, // durable
@@ -1202,7 +1216,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, // passive
0, // ticket
type); // type
- _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
+ getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)
@@ -1212,11 +1226,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler)
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
null, // arguments
false, // autoDelete
false, // durable
@@ -1247,11 +1259,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
amqd.setQueueName(protocolHandler.generateQueueName());
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
null, // arguments
amqd.isAutoDelete(), // autoDelete
amqd.isDurable(), // durable
@@ -1267,11 +1277,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
ft, // arguments
amqd.getExchangeName(), // exchange
true, // nowait
@@ -1315,11 +1323,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
arguments, // arguments
tag, // consumerTag
consumer.isExclusive(), // exclusive
@@ -1513,17 +1519,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
try
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
false, // ifEmpty
false, // ifUnused
true, // nowait
queueName, // queue
0); // ticket
- _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
catch (AMQException e)
{
@@ -1608,18 +1612,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
queueName, // queue
routingKey); // routingKey
AMQMethodEvent response = null;
try
{
- response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
}
catch (AMQException e)
{
@@ -1672,18 +1674,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
deliveryTag, // deliveryTag
multiple); // multiple
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
- _connection.getProtocolHandler().writeFrame(ackFrame);
+ getProtocolHandler().writeFrame(ackFrame);
}
public int getDefaultPrefetch()
@@ -1742,7 +1742,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
AMQDestination amqd = consumer.getDestination();
- AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+ AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler);
@@ -1839,25 +1839,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void suspendChannel()
{
_logger.warn("Suspending channel");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
false); // active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ getProtocolHandler().writeFrame(channelFlowFrame);
}
private void unsuspendChannel()
{
_logger.warn("Unsuspending channel");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
true); // active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ getProtocolHandler().writeFrame(channelFlowFrame);
}
public void confirmConsumerCancelled(AMQShortString consumerTag)
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 815cadb74d..e0d7db61cf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -467,11 +467,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (sendClose)
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions change.
final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(),
_consumerTag, // consumerTag
false); // nowait
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
index da903e7c1d..090c77165d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index d7fc86a5b0..477a679b90 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -60,82 +60,117 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
{
ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
- try
+ byte major = (byte) body.versionMajor;
+ byte minor = (byte) body.versionMinor;
+
+ if(checkVersionOK(major, minor))
{
- // the mechanism we are going to use
- String mechanism;
- if (body.mechanisms == null)
- {
- throw new AMQException("mechanism not specified in ConnectionStart method frame");
- }
- else
- {
- mechanism = chooseMechanism(body.mechanisms);
- }
- if (mechanism == null)
- {
- throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
- }
+ protocolSession.setProtocolVersion(major, minor);
+
- byte[] saslResponse;
try
{
- SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
- null, "AMQP", "localhost",
- null,createCallbackHandler(mechanism, protocolSession));
- if (sc == null)
+ // the mechanism we are going to use
+ String mechanism;
+ if (body.mechanisms == null)
{
- throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " +
- mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " +
- " details of how to register non-standard SASL client providers.");
+ throw new AMQException("mechanism not specified in ConnectionStart method frame");
+ }
+ else
+ {
+ mechanism = chooseMechanism(body.mechanisms);
}
- protocolSession.setSaslClient(sc);
- saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
- }
- catch (SaslException e)
- {
- protocolSession.setSaslClient(null);
- throw new AMQException("Unable to create SASL client: " + e, e);
- }
- if (body.locales == null)
- {
- throw new AMQException("Locales is not defined in Connection Start method");
- }
- final String locales = new String(body.locales, "utf8");
- final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
- String selectedLocale = null;
- if (tokenizer.hasMoreTokens())
- {
- selectedLocale = tokenizer.nextToken();
+ if (mechanism == null)
+ {
+ throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
+ }
+
+ byte[] saslResponse;
+ try
+ {
+ SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
+ null, "AMQP", "localhost",
+ null,createCallbackHandler(mechanism, protocolSession));
+ if (sc == null)
+ {
+ throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " +
+ mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " +
+ " details of how to register non-standard SASL client providers.");
+ }
+ protocolSession.setSaslClient(sc);
+ saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
+ }
+ catch (SaslException e)
+ {
+ protocolSession.setSaslClient(null);
+ throw new AMQException("Unable to create SASL client: " + e, e);
+ }
+
+ if (body.locales == null)
+ {
+ throw new AMQException("Locales is not defined in Connection Start method");
+ }
+ final String locales = new String(body.locales, "utf8");
+ final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+ String selectedLocale = null;
+ if (tokenizer.hasMoreTokens())
+ {
+ selectedLocale = tokenizer.nextToken();
+ }
+ else
+ {
+ throw new AMQException("No locales sent from server, passed: " + locales);
+ }
+
+ stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ FieldTable clientProperties = FieldTableFactory.newFieldTable();
+
+ clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID());
+ clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName());
+ clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion());
+ clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ clientProperties, // clientProperties
+ new AMQShortString(selectedLocale), // locale
+ new AMQShortString(mechanism), // mechanism
+ saslResponse)); // response
}
- else
+ catch (UnsupportedEncodingException e)
{
- throw new AMQException("No locales sent from server, passed: " + locales);
+ throw new AMQException(_log, "Unable to decode data: " + e, e);
}
+ }
+ else
+ {
+ _log.error("Broker requested Protocol ["
+ + body.versionMajor
+ + "-"
+ + body.versionMinor
+ + "] which is not supported by this version of the client library");
- stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- FieldTable clientProperties = FieldTableFactory.newFieldTable();
-
- clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID());
- clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName());
- clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion());
- clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- clientProperties, // clientProperties
- new AMQShortString(selectedLocale), // locale
- new AMQShortString(mechanism), // mechanism
- saslResponse)); // response
+ protocolSession.closeProtocolSession();
}
- catch (UnsupportedEncodingException e)
+ }
+
+ private boolean checkVersionOK(byte versionMajor, byte versionMinor)
+ {
+ byte[][] supportedVersions = ProtocolVersionList.pv;
+ boolean supported = false;
+ int i = supportedVersions.length;
+ while(i-- != 0 && !supported)
{
- throw new AMQException(_log, "Unable to decode data: " + e, e);
+ supported = (supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor)
+ && (supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor);
}
+
+ return supported;
}
private String getFullSystemInfo()
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index fbf195d20e..4e7f8a3032 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -95,21 +95,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
-
- // We add a proxy for the state manager so that we can substitute the state manager easily in this class.
- // We substitute the state manager when performing failover
-/* _frameListeners.add(new AMQMethodListener()
- {
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
- {
- return _stateManager.methodReceived(evt);
- }
-
- public void error(Exception e)
- {
- _stateManager.error(e);
- }
- });*/
}
public boolean isUseSSL()
@@ -152,7 +137,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void sessionOpened(IoSession session) throws Exception
{
- System.setProperty("foo", "bar");
+ //System.setProperty("foo", "bar");
}
/**
@@ -526,7 +511,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
+ _protocolSession.getProtocolMajorVersion(),
+ _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
@@ -622,4 +608,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_failoverState = failoverState;
}
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolSession.getProtocolMajorVersion();
+ }
+
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolSession.getProtocolMinorVersion();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b6dd05d761..2399819a07 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -93,6 +93,12 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
+ private byte _protocolMinorVersion;
+ private byte _protocolMajorVersion;
+
+
+
+
/**
* No-arg constructor for use by test subclass - has to initialise final vars
* NOT intended for use other then for test
@@ -458,4 +464,22 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
session.confirmConsumerCancelled(consumerTag);
}
+
+ public void setProtocolVersion(byte versionMajor, byte versionMinor)
+ {
+ _protocolMajorVersion = versionMajor;
+ _protocolMinorVersion = versionMinor;
+
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 3f371161c6..0cfa4eddce 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -158,4 +158,14 @@ public class MockProtocolSession implements AMQProtocolSession
{
//To change body of implemented methods use File | Settings | File Templates.
}
+
+ public byte getProtocolMajorVersion()
+ {
+ return 8; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
}