summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-03-22 13:14:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-03-22 13:14:42 +0000
commit3fb9be28593263e12623ce09084a230b59b81f4f (patch)
tree8de74dd781802819df0ff1ca56aaa94fa1b9b38e /java/client/src/main
parentb9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff)
downloadqpid-python-3fb9be28593263e12623ce09084a230b59b81f4f.tar.gz
made a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java58
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java359
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java159
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java41
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java)8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java)5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java)5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java)23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java)5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java)7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java)28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java)5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java)7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java)24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java)76
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java)36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java)9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java)7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java636
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java537
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java48
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java58
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java278
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java4
39 files changed, 1459 insertions, 1224 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 413524b6d8..1b4ae02399 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -25,7 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverSupport;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -36,6 +37,8 @@ import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Connection;
@@ -92,7 +95,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
* handler.
*/
- private AMQProtocolHandler _protocolHandler;
+ private AMQProtocolHandlerImpl _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
@@ -273,7 +276,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_failoverPolicy = new FailoverPolicy(connectionURL);
- _protocolHandler = new AMQProtocolHandler(this);
+ _protocolHandler = new AMQProtocolHandlerImpl(this);
// We are not currently connected
_connected = false;
@@ -550,26 +553,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
+ // define this here, should be poassed in
+ final int prefetchSize = 0;
- // TODO: Be aware of possible changes to parameter order as versions change.
+ AMQMethodFactory methodFactory = getAMQMethodFactory();
- _protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- null), // outOfBand
- ChannelOpenOkBody.class);
-
- //todo send low water mark when protocol allows.
- //todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
+ ChannelOpenBody openBody = methodFactory.createChannelOpen();
+ sendCommandReceiveResponse(channelId, openBody);
+ AMQMethodBody qosBody = methodFactory.createMessageQos(prefetchHigh, prefetchSize);
+ sendCommandReceiveResponse(channelId, qosBody);
if (transacted)
{
@@ -578,14 +570,21 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.debug("Issuing TxSelect for " + channelId);
}
- // TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()),
- TxSelectOkBody.class);
+ TxSelectBody txSelectBody = methodFactory.createTxSelect();
+ sendCommandReceiveResponse(channelId, txSelectBody);
}
}
+ private AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+ {
+ return getProtocolOutputHandler().sendCommandReceiveResponse(channelId, command);
+ }
+
+ private AMQMethodFactory getAMQMethodFactory()
+ {
+ return getProtocolOutputHandler().getAMQMethodFactory();
+ }
+
private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException
{
try
@@ -934,7 +933,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _virtualHost;
}
- public AMQProtocolHandler getProtocolHandler()
+ public AMQProtocolHandlerImpl getProtocolHandler()
{
return _protocolHandler;
}
@@ -1218,4 +1217,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_taskPool.execute(task);
}
+
+ public ProtocolOutputHandler getProtocolOutputHandler()
+ {
+ return _protocolHandler.getOutputHandler();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 89f596e541..41101ff374 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -59,6 +59,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
@@ -68,41 +69,12 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AccessRequestBody;
-import org.apache.qpid.framing.AccessRequestOkBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -197,6 +169,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _suspended;
private final Object _suspensionLock = new Object();
+ private static final AMQShortString CHANNEL_CLOSE_REPLY_TEXT = new AMQShortString("JMS client closing channel");
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
@@ -271,11 +244,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (message.getDeliverBody() != null)
{
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag());
if (consumer == null)
{
- _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring...");
+ _logger.warn("Received a message from queue " + message.getDeliverBody().getConsumerTag() + " without a handler - ignoring...");
_logger.warn("Consumers that exist: " + _consumers);
_logger.warn("Session hashcode: " + System.identityHashCode(this));
}
@@ -513,14 +486,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
i.next().acknowledgeLastDelivered();
}
- // Commits outstanding messages sent and outstanding acknowledgements.
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQProtocolHandler handler = getProtocolHandler();
+ TxCommitBody commitBody = getAMQMethodFactory().createTxCommit();
+ sendCommandReceiveResponse(commitBody);
+
+
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion()),
- TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -549,8 +519,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
suspendChannel(true);
}
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ TxRollbackBody rollbackBody = getAMQMethodFactory().createTxRollback();
+ sendCommandReceiveResponse(rollbackBody);
if (_dispatcher != null)
{
@@ -590,15 +560,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
getProtocolHandler().closeSession(this);
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
-
- getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+ ChannelCloseBody closeBody = getAMQMethodFactory().createChannelClose(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ CHANNEL_CLOSE_REPLY_TEXT);
+ sendCommandReceiveResponse(closeBody, timeout);
+
+
+
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -617,21 +584,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- private AMQProtocolHandler getProtocolHandler()
+ private AMQProtocolHandlerImpl getProtocolHandler()
{
return _connection.getProtocolHandler();
}
-
- private byte getProtocolMinorVersion()
+ public ProtocolOutputHandler getProtocolOutputHandler()
{
- return getProtocolHandler().getProtocolMinorVersion();
+ return _connection.getProtocolOutputHandler();
}
- private byte getProtocolMajorVersion()
- {
- return getProtocolHandler().getProtocolMajorVersion();
- }
+
/**
@@ -835,14 +798,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
consumer.clearUnackedMessages();
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- false) // requeue
- , BasicRecoverOkBody.class);
+ final boolean requeue = false;
+ AMQMethodBody recoverBody = getAMQMethodFactory().createRecover(requeue);
+ sendCommandReceiveResponse(recoverBody);
+
+
+
if (_dispatcher != null)
{
@@ -1226,136 +1187,60 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
{
- declareExchange(name, type, getProtocolHandler());
+ ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket());
+ sendCommandReceiveResponse(exchangeDeclare);
}
public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- false, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
- getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
- }
+ ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket());
+ sendCommandReceiveResponse(exchangeDeclare);
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
- {
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
- }
-
- private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
- {
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- false, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
-
- protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
-
public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException
{
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- autoDelete, // autoDelete
- durable, // durable
- exclusive, // exclusive
- false, // nowait
- false, // passive
- name, // queue
- getTicket()); // ticket
-
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
-
+ QueueDeclareBody queueDeclare = getAMQMethodFactory().createQueueDeclare(name, null, autoDelete, durable, exclusive, false ,getTicket());
+ sendCommandReceiveResponse(queueDeclare);
}
public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- arguments, // arguments
- exchangeName, // exchange
- false, // nowait
- queueName, // queue
- routingKey, // routingKey
- getTicket()); // ticket
-
-
- getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+ QueueBindBody queueBind = getAMQMethodFactory().createQueueBind(queueName,exchangeName,routingKey,arguments,getTicket());
+ sendCommandReceiveResponse(queueBind);
}
/**
* Declare the queue.
*
* @param amqd
- * @param protocolHandler
*
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
*
* @throws AMQException
*/
- private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+ private AMQShortString declareQueue(AMQDestination amqd) throws AMQException
{
// For queues (but not topics) we generate the name in the client rather than the
// server. This allows the name to be reused on failover if required. In general,
// the destination indicates whether it wants a name generated or not.
if (amqd.isNameRequired())
{
- amqd.setQueueName(protocolHandler.generateQueueName());
+ amqd.setQueueName(getProtocolHandler().generateQueueName());
}
//TODO verify the destiation is valid. else throw
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- amqd.isAutoDelete(), // autoDelete
- amqd.isDurable(), // durable
- amqd.isExclusive(), // exclusive
- false, // nowait
- false, // passive
- amqd.getAMQQueueName(), // queue
- getTicket()); // ticket
-
- protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ createQueue(amqd.getAMQQueueName(),amqd.isAutoDelete(),amqd.isDurable(),amqd.isExclusive());
+
+
return amqd.getAMQQueueName();
}
- private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+ private void bindQueue(AMQDestination amqd, AMQShortString queueName, FieldTable ft) throws AMQException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- ft, // arguments
- amqd.getExchangeName(), // exchange
- false, // nowait
- queueName, // queue
- amqd.getRoutingKey(), // routingKey
- getTicket()); // ticket
-
-
- protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
+ bindQueue(queueName,amqd.getRoutingKey(),ft,amqd.getExchangeName());
}
/**
@@ -1365,8 +1250,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @return the consumer tag generated by the broker
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector) throws AMQException
+ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
@@ -1392,25 +1276,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- getTicket()); // ticket
- if (nowait)
+ final boolean noAck = consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE;
+
+ AMQMethodBody consumeBody = getAMQMethodFactory().createConsumer(tag,
+ queueName,
+ arguments,
+ noAck,
+ consumer.isExclusive(),
+ consumer.isNoLocal(),
+ getTicket());
+ if(nowait)
{
- protocolHandler.writeFrame(jmsConsume);
+ sendCommand(consumeBody);
}
else
{
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ sendCommandReceiveResponse(consumeBody);
}
+
}
catch (AMQException e)
{
@@ -1606,15 +1489,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
try
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false, // ifEmpty
- false, // ifUnused
- true, // nowait
- queueName, // queue
- getTicket()); // ticket
- getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+
+ QueueDeleteBody deleteBody = getAMQMethodFactory().createQueueDelete(queueName, false, false, getTicket());
+ sendCommandReceiveResponse(deleteBody);
}
catch (AMQException e)
{
@@ -1697,23 +1574,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- exchangeName, // exchange
- queueName, // queue
- routingKey); // routingKey
AMQMethodEvent response = null;
try
{
- response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ ExchangeBoundBody exchangeBoundBody =
+ getAMQMethodFactory().createExchangeBound(exchangeName, // exchange
+ queueName, // queue
+ routingKey); // routingKey
+
+
+ ExchangeBoundOkBody responseBody = sendCommandReceiveResponse(exchangeBoundBody, ExchangeBoundOkBody.class);
+ return (responseBody.getReplyCode() == 0);
}
catch (AMQException e)
{
throw new JMSAMQException(e);
}
- ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
- return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency
+
}
private void checkTransacted() throws JMSException
@@ -1770,13 +1647,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
false,
- message.getBounceBody().exchange,
- message.getBounceBody().routingKey,
+ message.getBounceBody().getExchange(),
+ message.getBounceBody().getRoutingKey(),
message.getContentHeader(),
message.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().getReplyCode());
+ AMQShortString reason = message.getBounceBody().getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
//@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
@@ -1812,16 +1689,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- deliveryTag, // deliveryTag
- multiple); // multiple
+ AMQMethodBody ackBody = getAMQMethodFactory().createAcknowledge(deliveryTag, multiple);
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
- getProtocolHandler().writeFrame(ackFrame);
+ sendCommand(ackBody);
}
public int getDefaultPrefetch()
@@ -1908,17 +1781,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
AMQDestination amqd = consumer.getDestination();
- AMQProtocolHandler protocolHandler = getProtocolHandler();
-
- declareExchange(amqd, protocolHandler);
+ declareExchange(amqd.getExchangeName(), amqd.getExchangeClass());
- AMQShortString queueName = declareQueue(amqd, protocolHandler);
+ AMQShortString queueName = declareQueue(amqd);
- bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
+ bindQueue(amqd, queueName, consumer.getRawSelectorFieldTable());
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+ consumeFromQueue(consumer, queueName, nowait, consumer.getMessageSelector());
}
catch (JMSException e) //thrown by getMessageSelector
{
@@ -2019,14 +1890,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_suspended = suspend;
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- !suspend); // active
+ AMQMethodBody flowBody = getAMQMethodFactory().createChannelFlow(!suspend);
+ sendCommandReceiveResponse(flowBody);
- _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
}
}
@@ -2118,32 +1984,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException
{
- getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- active,
- exclusive,
- passive,
- read,
- realm,
- write),
- new BlockingMethodFrameListener(_channelId)
- {
- public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
- {
- if (frame instanceof AccessRequestOkBody)
- {
- setTicket(((AccessRequestOkBody) frame).getTicket());
- return true;
- }
- else
- {
- return false;
- }
- }
- });
+ AccessRequestBody accessRequest = getAMQMethodFactory().createAccessRequest(active,exclusive,passive,read,realm,write);
+ AccessRequestOkBody okBody = getProtocolOutputHandler().sendCommandReceiveResponse(_channelId,accessRequest, AccessRequestOkBody.class );
+ setTicket(okBody.getTicket());
+ }
+ AMQMethodFactory getAMQMethodFactory()
+ {
+ return getProtocolOutputHandler().getAMQMethodFactory();
}
private class SuspenderRunner implements Runnable
@@ -2187,7 +2036,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag))
+ if (consumerTag == null || message.getDeliverBody().getConsumerTag().equals(consumerTag))
{
if (_logger.isTraceEnabled())
{
@@ -2196,7 +2045,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
messages.remove();
- rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue);
_logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
}
@@ -2209,13 +2058,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rejectMessage(long deliveryTag, boolean requeue)
{
- AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- deliveryTag,
- requeue);
+ AMQMethodBody rejectBody = getAMQMethodFactory().createRejectBody(deliveryTag, requeue);
+ sendCommand(rejectBody);
+
+ }
- _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ <T extends AMQMethodBody> T sendCommandReceiveResponse(AMQMethodBody command, Class<T> responseClass) throws AMQException
+ {
+ return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, responseClass);
+ }
+ AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException
+ {
+ return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command);
+ }
+ AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command, long timeout) throws AMQException
+ {
+ return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, timeout);
}
+
+ void sendCommand(AMQMethodBody command)
+ {
+ getProtocolOutputHandler().sendCommand(_channelId, command);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index e9b914425a..f0bea6cc90 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -33,6 +33,7 @@ import javax.jms.MessageListener;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
@@ -42,6 +43,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
@@ -438,16 +441,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (sendClose)
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- _consumerTag, // consumerTag
- false); // nowait
-
try
{
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ AMQMethodBody cancelBody = getAMQMethodFactory().createConsumerCancel(_consumerTag);
+ sendCommandReceiveResponse(cancelBody);
}
catch (AMQException e)
{
@@ -467,6 +464,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+ private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody cancelBody) throws AMQException
+ {
+ return getSession().sendCommandReceiveResponse(cancelBody);
+ }
+
+ private AMQMethodFactory getAMQMethodFactory()
+ {
+ return getSession().getAMQMethodFactory();
+ }
+
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
* vetoed automatic resubscription. The caller must hold the failover mutex.
@@ -490,14 +497,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (debug)
{
- _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
+ _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().getDeliveryTag());
}
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered,
- messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey,
+ AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(),
+ messageFrame.getDeliverBody().getRedelivered(),
+ messageFrame.getDeliverBody().getExchange(),
+ messageFrame.getDeliverBody().getRoutingKey(),
messageFrame.getContentHeader(),
messageFrame.getBodies());
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index b01e087ce1..43b9f3569a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -38,17 +38,11 @@ import javax.jms.Topic;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
+import org.apache.qpid.framing.*;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -91,7 +85,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
*/
private String _mimeType;
- private AMQProtocolHandler _protocolHandler;
+ private AMQProtocolHandlerImpl _protocolHandler;
/**
* True if this producer was created from a transacted session
@@ -121,7 +115,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+ AMQSession session, AMQProtocolHandlerImpl protocolHandler, long producerId,
boolean immediate, boolean mandatory, boolean waitUntilSent)
{
_connection = connection;
@@ -152,20 +146,28 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private void declareDestination(AMQDestination destination)
{
// Declare the exchange
- // Note that the durable and internal arguments are ignored since passive is set to false
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame declare =
- ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
- _protocolHandler.writeFrame(declare);
+
+ ExchangeDeclareBody exchangeDeclareBody =
+ getAMQMethodFactory().createExchangeDeclare(destination.getExchangeName(),
+ destination.getExchangeClass(),
+ _session.getTicket());
+ sendCommand(exchangeDeclareBody);
+
+ }
+
+ private void sendCommand(AMQMethodBody command)
+ {
+ getSession().sendCommand(command);
+ }
+
+ private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException
+ {
+ return getSession().sendCommandReceiveResponse(command);
+ }
+
+ private AMQMethodFactory getAMQMethodFactory()
+ {
+ return getSession().getAMQMethodFactory();
}
public void setDisableMessageID(boolean b) throws JMSException
@@ -467,21 +469,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame =
- BasicPublishBody.createAMQFrame(
- _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(),
- destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- _session.getTicket()); // ticket
-
message.prepareForSending();
ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+ CommonContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
if (!_disableTimestamps)
{
@@ -501,37 +491,15 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- final int size = (payload != null) ? payload.limit() : 0;
- final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
- final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+ getSession().getProtocolOutputHandler().publishMessage(getSession().getChannelId(),
+ destination.getExchangeName(),
+ destination.getRoutingKey(),
+ immediate,
+ mandatory,
+ payload,
+ contentHeaderProperties,
+ getSession().getTicket());
- if (payload != null)
- {
- createContentBodies(payload, frames, 2, _channelId);
- }
-
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
- {
- _logger.debug("Sending content body frames to " + destination);
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), 0,
- contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- _protocolHandler.writeFrame(compositeFrame, wait);
if (message != origMessage)
{
@@ -544,6 +512,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
}
+
+
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
if (destination instanceof TemporaryDestination)
@@ -564,59 +534,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
}
- /**
- * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- * maximum frame size.
- *
- * @param payload
- * @param frames
- * @param offset
- * @param channelId @return the array of content bodies
- */
- private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
- {
-
- if (frames.length == (offset + 1))
- {
- frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
- }
- else
- {
-
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- long remaining = payload.remaining();
- for (int i = offset; i < frames.length; i++)
- {
- payload.position((int) framePayloadMax * (i - offset));
- int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
- payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
-
- remaining -= length;
- }
- }
-
- }
-
- private int calculateContentBodyFrameCount(ByteBuffer payload)
- {
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int frameCount;
- if ((payload == null) || (payload.remaining() == 0))
- {
- frameCount = 0;
- }
- else
- {
- int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
- frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- }
-
- return frameCount;
- }
public void setMimeType(String mimeType) throws JMSException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 844ecbe743..268456290e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.log4j.Logger;
import org.apache.mina.common.IoSession;
import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.client.state.AMQStateManager;
/**
@@ -42,7 +42,7 @@ public class FailoverHandler implements Runnable
private static final Logger _logger = Logger.getLogger(FailoverHandler.class);
private final IoSession _session;
- private AMQProtocolHandler _amqProtocolHandler;
+ private AMQProtocolHandlerImpl _amqProtocolHandler;
/**
* Used where forcing the failover host
@@ -54,7 +54,7 @@ public class FailoverHandler implements Runnable
*/
private int _port;
- public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+ public FailoverHandler(AMQProtocolHandlerImpl amqProtocolHandler, IoSession session)
{
_amqProtocolHandler = amqProtocolHandler;
_session = session;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java
new file mode 100644
index 0000000000..994e58ed03
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java
@@ -0,0 +1,24 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.amqp_0_9.ChannelCloseOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ChannelCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ChannelCloseMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class);
+
+ private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler();
+
+ public static ChannelCloseMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+
+ protected ChannelCloseOkBody createChannelCloseOkBody()
+ {
+ return new ChannelCloseOkBodyImpl();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java
new file mode 100644
index 0000000000..b9b7074fd2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java
@@ -0,0 +1,29 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.amqp_0_9.ConnectionCloseOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ConnectionCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
+
+ private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler();
+
+ public static ConnectionCloseMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ protected ConnectionCloseMethodHandler()
+ {
+ }
+
+
+ protected ConnectionCloseOkBody createConnectionCloseOkBody()
+ {
+ return new ConnectionCloseOkBodyImpl();
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java
new file mode 100644
index 0000000000..859fd2b63b
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java
@@ -0,0 +1,28 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+public class ConnectionSecureMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler
+{
+ private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
+
+ public static ConnectionSecureMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response)
+ {
+ return new ConnectionSecureOkBodyImpl(response);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java
new file mode 100644
index 0000000000..e28619b378
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java
@@ -0,0 +1,41 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.amqp_0_9.ConnectionOpenBodyImpl;
+import org.apache.qpid.framing.amqp_0_9.ConnectionTuneOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ConnectionTuneMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class);
+
+ private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler();
+
+ public static ConnectionTuneMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+
+ protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist)
+ {
+
+ return new ConnectionOpenBodyImpl(path,// virtualHost
+ capabilities, // capabilities
+ insist); // insist
+
+ }
+
+ protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params)
+ {
+ // Be aware of possible changes to parameter order as versions change.
+ return new ConnectionTuneOkBodyImpl(
+ params.getChannelMax(), // channelMax
+ params.getFrameMax(), // frameMax
+ params.getHeartbeat()); // heartbeat
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java
index 9bd0205977..2acf3005f1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -45,10 +45,12 @@ public class BasicCancelOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
+
_logger.debug("New BasicCancelOk method received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
- protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.getConsumerTag());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java
index d34d6688c1..47586db2f2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -40,8 +40,9 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
_logger.debug("New JmsDeliver method received");
protocolSession.unprocessedMessageReceived(msg);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java
index 02573c5d00..ad5a1331b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -40,9 +40,10 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.debug("New JmsBounce method received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod());
protocolSession.unprocessedMessageReceived(msg);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java
index e2b101ab79..edfd8e5110 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelClosedException;
@@ -29,10 +29,10 @@ import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.amqp_8_0.ChannelCloseOkBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -47,21 +47,23 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ChannelClose method received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
- protocolSession.writeFrame(frame);
+ protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), createChannelCloseOkBody());
+
+
+
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
if (_logger.isDebugEnabled())
@@ -96,4 +98,9 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
}
protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
}
+
+ protected ChannelCloseOkBody createChannelCloseOkBody()
+ {
+ return new ChannelCloseOkBodyImpl();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java
index 794071cc34..d51f1d9c41 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java
@@ -18,11 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -38,7 +37,7 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java
index 1f003649c0..baa9cb4319 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java
@@ -18,11 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ChannelFlowOkBody;
@@ -42,9 +41,9 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
- _logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
+ _logger.debug("Received Channel.Flow-Ok message, active = " + method.getActive());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java
index 9c8e9188ec..57eaaff531 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionClosedException;
@@ -31,6 +31,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseOkBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -45,26 +46,31 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
return _handler;
}
- private ConnectionCloseMethodHandler()
+ protected ConnectionCloseMethodHandler()
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.info("ConnectionClose frame received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
// does it matter
//stateManager.changeState(AMQState.CONNECTION_CLOSING);
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
try
{
- // TODO: check whether channel id of zero is appropriate
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor()));
+
+
+
+ ConnectionCloseOkBody closeOkBody = createConnectionCloseOkBody();
+ protocolSession.getOutputHandler().sendCommand(0, closeOkBody);
+
+
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
@@ -97,4 +103,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
+
+ protected ConnectionCloseOkBody createConnectionCloseOkBody()
+ {
+ return new ConnectionCloseOkBodyImpl();
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java
index 2e0f273c32..f4327ac748 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java
@@ -18,10 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -40,7 +39,7 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
stateManager.changeState(AMQState.CONNECTION_OPEN);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java
index 866f65b384..8291b62596 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -45,12 +45,13 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.info("ConnectionRedirect frame received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
- String host = method.host.toString();
+ String host = method.getHost().toString();
// the host is in the form hostname:port with the port being optional
int portIndex = host.indexOf(':');
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java
index ab6acffeaf..f5d1840f45 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
@@ -27,9 +27,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionSecureMethodHandler implements StateAwareMethodListener
@@ -41,8 +41,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
SaslClient client = protocolSession.getSaslClient();
if (client == null)
{
@@ -54,14 +55,10 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
try
{
// Evaluate server challenge
- byte[] response = client.evaluateChallenge(body.challenge);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
- body.getMajor(), body.getMinor(),
- response); // response
- protocolSession.writeFrame(responseFrame);
+ byte[] response = client.evaluateChallenge(body.getChallenge());
+
+ ConnectionSecureOkBody secureOkBody = createConnectionSecureOkBody(response);
+ protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),secureOkBody);
}
catch (SaslException e)
{
@@ -70,4 +67,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
}
+
+ protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response)
+ {
+ return new ConnectionSecureOkBodyImpl(response);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java
index 2aa2c1872b..10473b9751 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
@@ -44,7 +44,8 @@ import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.amqp_8_0.ConnectionStartOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionStartMethodHandler implements StateAwareMethodListener
@@ -61,55 +62,49 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
private ConnectionStartMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
throws AMQException
{
_log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
+ "AMQMethodEvent evt): called");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
- byte major = (byte) body.versionMajor;
- byte minor = (byte) body.versionMinor;
- boolean versionOk = false;
+ ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(),(byte) body.getVersionMinor());
+
// For the purposes of interop, we can make the client accept the broker's version string.
// If it does, it then internally records the version as being the latest one that it understands.
// It needs to do this since frame lookup is done by version.
- if (Boolean.getBoolean("qpid.accept.broker.version"))
+ if (Boolean.getBoolean("qpid.accept.broker.version") && !pv.isSupported())
{
- versionOk = true;
- int lastIndex = ProtocolVersionList.pv.length - 1;
- major = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MAJOR];
- minor = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MINOR];
- }
- else
- {
- versionOk = checkVersionOK(major, minor);
+
+ pv = ProtocolVersion.getLatestSupportedVersion();
}
- if (versionOk)
+ if (pv.isSupported())
{
- protocolSession.setProtocolVersion(major, minor);
+ protocolSession.setProtocolVersion(pv);
try
{
// Used to hold the SASL mechanism to authenticate with.
String mechanism;
- if (body.mechanisms == null)
+ if (body.getMechanisms() == null)
{
throw new AMQException("mechanism not specified in ConnectionStart method frame");
}
else
{
- mechanism = chooseMechanism(body.mechanisms);
+ mechanism = chooseMechanism(body.getMechanisms());
_log.debug("mechanism = " + mechanism);
}
if (mechanism == null)
{
- throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
+ throw new AMQException("No supported security mechanism found, passed: " + new String(body.getMechanisms()));
}
byte[] saslResponse;
@@ -135,12 +130,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
throw new AMQException("Unable to create SASL client: " + e, e);
}
- if (body.locales == null)
+ if (body.getLocales() == null)
{
throw new AMQException("Locales is not defined in Connection Start method");
}
- final String locales = new String(body.locales, "utf8");
+ final String locales = new String(body.getLocales(), "utf8");
final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
String selectedLocale = null;
if (tokenizer.hasMoreTokens())
@@ -155,24 +150,19 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
+ clientProperties.setString(ClientProperties.instance.getName(),
protocolSession.getClientID());
- clientProperties.setString(new AMQShortString(ClientProperties.product.toString()),
+ clientProperties.setString(ClientProperties.product.getName(),
QpidProperties.getProductName());
- clientProperties.setString(new AMQShortString(ClientProperties.version.toString()),
+ clientProperties.setString(ClientProperties.version.getName(),
QpidProperties.getReleaseVersion());
- clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
-
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- clientProperties, // clientProperties
+ clientProperties.setString(ClientProperties.platform.getName(), getFullSystemInfo());
+
+ ConnectionStartOkBody startOkBody = createConnectionStartOkBody(clientProperties, // clientProperties
new AMQShortString(selectedLocale), // locale
new AMQShortString(mechanism), // mechanism
- saslResponse)); // response
+ saslResponse); // response
+ protocolSession.getOutputHandler().sendCommand(0, startOkBody);
}
catch (UnsupportedEncodingException e)
@@ -182,27 +172,19 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
}
else
{
- _log.error("Broker requested Protocol [" + body.versionMajor + "-" + body.versionMinor
+ _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor()
+ "] which is not supported by this version of the client library");
protocolSession.closeProtocolSession();
}
}
- private boolean checkVersionOK(byte versionMajor, byte versionMinor)
+ private ConnectionStartOkBody createConnectionStartOkBody(FieldTable clientProperties, AMQShortString locale, AMQShortString mechanism, byte[] saslResponse)
{
- byte[][] supportedVersions = ProtocolVersionList.pv;
- boolean supported = false;
- int i = supportedVersions.length;
- while ((i-- != 0) && !supported)
- {
- supported = (supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor)
- && (supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor);
- }
-
- return supported;
+ return new ConnectionStartOkBodyImpl(clientProperties,mechanism,saslResponse,locale);
}
+
private String getFullSystemInfo()
{
StringBuffer fullSystemInfo = new StringBuffer();
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java
index 67f1a6519f..3eb3401e37 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -27,11 +27,12 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionOpenBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ConnectionTuneOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionTuneMethodHandler implements StateAwareMethodListener
@@ -49,9 +50,10 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ConnectionTune frame received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters();
@@ -60,36 +62,36 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
params = new ConnectionTuneParameters();
}
- params.setFrameMax(frame.frameMax);
- params.setChannelMax(frame.channelMax);
- params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+ params.setFrameMax(frame.getFrameMax());
+ params.setChannelMax(frame.getChannelMax());
+ params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
protocolSession.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params,frame.getMajor(), frame.getMinor()));
+ protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),
+ createTuneOkBody(params));
String host = protocolSession.getAMQConnection().getVirtualHost();
AMQShortString virtualHost = new AMQShortString("/" + host);
- protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true,frame.getMajor(), frame.getMinor()));
+ protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),
+ createConnectionOpenBody( virtualHost, null, true));
}
- protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
+ protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist)
{
- // Be aware of possible changes to parameter order as versions change.
- return ConnectionOpenBody.createAMQFrame(channel,
- major, minor, // AMQP version (major, minor)
+
+ return new ConnectionOpenBodyImpl(path,// virtualHost
capabilities, // capabilities
- insist, // insist
- path); // virtualHost
+ insist); // insist
+
}
- protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor)
+ protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params)
{
// Be aware of possible changes to parameter order as versions change.
- return ConnectionTuneOkBody.createAMQFrame(channel,
- major, minor,
+ return new ConnectionTuneOkBodyImpl(
params.getChannelMax(), // channelMax
params.getFrameMax(), // frameMax
params.getHeartbeat()); // heartbeat
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java
index 146c705c00..0752b38aaf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java
@@ -15,11 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ExchangeBoundOkBody;
@@ -42,13 +41,13 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
if (_logger.isDebugEnabled())
{
ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
- _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " +
- body.replyText);
+ _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: " +
+ body.getReplyText());
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java
index eaf4721445..50bf03fe76 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java
@@ -15,11 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.QueueDeleteOkBody;
@@ -42,12 +41,12 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
if (_logger.isDebugEnabled())
{
QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
- _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+ _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount());
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 36dd4d400c..66524edce3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -121,12 +121,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public String getJMSMessageID() throws JMSException
{
- if (getContentHeaderProperties().getMessageId() == null)
+ if (getContentHeaderProperties().getMessageIdAsString() == null)
{
getContentHeaderProperties().setMessageId("ID:" + _deliveryTag);
}
- return getContentHeaderProperties().getMessageId();
+ return getContentHeaderProperties().getMessageIdAsString();
}
public void setJMSMessageID(String messageId) throws JMSException
@@ -146,7 +146,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte[] getJMSCorrelationIDAsBytes() throws JMSException
{
- return getContentHeaderProperties().getCorrelationId().getBytes();
+ return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
}
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
@@ -161,12 +161,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public String getJMSCorrelationID() throws JMSException
{
- return getContentHeaderProperties().getCorrelationId();
+ return getContentHeaderProperties().getCorrelationIdAsString();
}
public Destination getJMSReplyTo() throws JMSException
{
- String replyToEncoding = getContentHeaderProperties().getReplyTo();
+ String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
if (replyToEncoding == null)
{
return null;
@@ -250,7 +250,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public String getJMSType() throws JMSException
{
- return getContentHeaderProperties().getType();
+ return getContentHeaderProperties().getTypeAsString();
}
public void setJMSType(String string) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index c05667902f..763af312f4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -116,7 +116,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
_data.limit(text.length()) ;
//_data.sweep();
_data.setAutoExpand(true);
- final String encoding = getContentHeaderProperties().getEncoding();
+ final String encoding = getContentHeaderProperties().getEncodingAsString();
if (encoding == null)
{
_data.put(text.getBytes());
@@ -155,11 +155,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
{
return null;
}
- if (getContentHeaderProperties().getEncoding() != null)
+ if (getContentHeaderProperties().getEncodingAsString() != null)
{
try
{
- _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncoding()).newDecoder());
+ _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder());
}
catch (CharacterCodingException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index e02771d8f5..c2015f9e7c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -92,14 +92,14 @@ public class MessageFactoryRegistry
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
- AMQShortString contentTypeShortString = properties.getContentTypeShortString();
+ AMQShortString contentTypeShortString = properties.getContentType();
contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
: contentTypeShortString;
MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
if (mf == null)
{
- throw new AMQException("Unsupport MIME type of " + properties.getContentType());
+ throw new AMQException("Unsupport MIME type of " + properties.getContentTypeAsString());
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index d0cc52271a..93c0cb5c12 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -1,630 +1,36 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.client.protocol;
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.qpid.AMQConnectionClosedException;
-import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQTimeoutException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.failover.FailoverHandler;
-import org.apache.qpid.client.failover.FailoverState;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.ssl.SSLContextFactory;
-
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.state.AMQStateManager;
-public class AMQProtocolHandler extends IoHandlerAdapter
+/**
+ * Created by IntelliJ IDEA.
+ * User: U146758
+ * Date: 07-Mar-2007
+ * Time: 19:40:08
+ * To change this template use File | Settings | File Templates.
+ */
+public interface AMQProtocolHandler
{
- private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
-
- /**
- * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
- * and protocol handler instances.
- */
- private AMQConnection _connection;
-
- /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
- private volatile AMQProtocolSession _protocolSession;
-
- private AMQStateManager _stateManager = new AMQStateManager();
-
- private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
-
- /**
- * We create the failover handler when the session is created since it needs a reference to the IoSession in order
- * to be able to send errors during failover back to the client application. The session won't be available in the
- * case where we failing over due to a Connection.Redirect message from the broker.
- */
- private FailoverHandler _failoverHandler;
-
- /**
- * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
- * attempting failover where it is failing.
- */
- private FailoverState _failoverState = FailoverState.NOT_STARTED;
-
- private CountDownLatch _failoverLatch;
-
- private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
-
- public AMQProtocolHandler(AMQConnection con)
- {
- _connection = con;
- }
-
- public void sessionCreated(IoSession session) throws Exception
- {
- _logger.debug("Protocol session created for session " + System.identityHashCode(session));
- _failoverHandler = new FailoverHandler(this, session);
-
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
-
- if (Boolean.getBoolean("amqj.shared_read_write_pool"))
- {
- session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- session.getFilterChain().addLast("protocolFilter", pcf);
- }
- // we only add the SSL filter where we have an SSL connection
- if (_connection.getSSLConfiguration() != null)
- {
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
- SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
- sslFilter.setUseClientMode(true);
- session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
- }
-
-
- try
- {
-
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- e.printStackTrace();
- }
-
- _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
- _protocolSession.init();
- }
-
- public void sessionOpened(IoSession session) throws Exception
- {
- //System.setProperty("foo", "bar");
- }
-
- /**
- * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
- * sessionClosed() depending on whether we were trying to send data at the time of failure.
- *
- * @param session
- *
- * @throws Exception
- */
- public void sessionClosed(IoSession session) throws Exception
- {
- if (_connection.isClosed())
- {
- _logger.info("Session closed called by client");
- }
- else
- {
- _logger.info("Session closed called with failover state currently " + _failoverState);
-
- //reconnetablility was introduced here so as not to disturb the client as they have made their intentions
- // known through the policy settings.
-
- if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
- {
- _logger.info("FAILOVER STARTING");
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- _failoverState = FailoverState.IN_PROGRESS;
- startFailoverThread();
- }
- else
- {
- _logger.info("Not starting failover as state currently " + _failoverState);
- }
- }
- else
- {
- _logger.info("Failover not allowed by policy.");
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug(_connection.getFailoverPolicy().toString());
- }
-
- if (_failoverState != FailoverState.IN_PROGRESS)
- {
- _logger.info("sessionClose() not allowed to failover");
- _connection.exceptionReceived(
- new AMQDisconnectedException("Server closed connection and reconnection " +
- "not permitted."));
- }
- else
- {
- _logger.info("sessionClose() failover in progress");
- }
- }
- }
-
- _logger.info("Protocol Session [" + this + "] closed");
- }
-
- /** See {@link FailoverHandler} to see rationale for separate thread. */
- private void startFailoverThread()
- {
- Thread failoverThread = new Thread(_failoverHandler);
- failoverThread.setName("Failover");
- // Do not inherit daemon-ness from current thread as this can be a daemon
- // thread such as a AnonymousIoService thread.
- failoverThread.setDaemon(false);
- failoverThread.start();
- }
-
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
- {
- _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- //write heartbeat frame:
- _logger.debug("Sent heartbeat");
- session.write(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- //failover:
- HeartbeatDiagnostics.timeout();
- _logger.warn("Timed out while waiting for heartbeat from peer.");
- session.close();
- }
- }
-
- public void exceptionCaught(IoSession session, Throwable cause) throws Exception
- {
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
- if (cause instanceof AMQConnectionClosedException)
- {
- _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attemp failover
-
- sessionClosed(session);
- }
- }
- // we reach this point if failover was attempted and failed therefore we need to let the calling app
- // know since we cannot recover the situation
- else if (_failoverState == FailoverState.FAILED)
- {
- _logger.error("Exception caught by protocol handler: " + cause, cause);
- // we notify the state manager of the error in case we have any clients waiting on a state
- // change. Those "waiters" will be interrupted and can handle the exception
- AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToWaiters(amqe);
- _connection.exceptionReceived(cause);
- }
- }
-
- /**
- * There are two cases where we have other threads potentially blocking for events to be handled by this class.
- * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
- * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
- *
- * @param e the exception to propagate
- */
- public void propagateExceptionToWaiters(Exception e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- final Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener ml = (AMQMethodListener) it.next();
- ml.error(e);
- }
- }
- }
-
- private static int _messageReceivedCount;
-
- public void messageReceived(IoSession session, Object message) throws Exception
- {
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
-
- if (debug && (msgNumber % 1000 == 0))
- {
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
- switch (bodyFrame.getFrameType())
- {
- case AMQMethodBody.TYPE:
-
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
-
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
-
- try
- {
-
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
- }
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
- exceptionCaught(session, e);
- }
- break;
-
- case ContentHeaderBody.TYPE:
-
- _protocolSession.messageContentHeaderReceived(frame.getChannel(),
- (ContentHeaderBody) bodyFrame);
- break;
-
- case ContentBody.TYPE:
-
- _protocolSession.messageContentBodyReceived(frame.getChannel(),
- (ContentBody) bodyFrame);
- break;
-
- case HeartbeatBody.TYPE:
-
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
- break;
-
- default:
-
- }
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
- }
-
- private static int _messagesOut;
-
- public void messageSent(IoSession session, Object message) throws Exception
- {
- final long sentMessages = _messagesOut++;
-
- final boolean debug = _logger.isDebugEnabled();
-
- if (debug && (sentMessages % 1000 == 0))
- {
- _logger.debug("Sent " + _messagesOut + " protocol messages");
- }
- _connection.bytesSent(session.getWrittenBytes());
- if (debug)
- {
- _logger.debug("Sent frame " + message);
- }
- }
-
- /*
- public void addFrameListener(AMQMethodListener listener)
- {
- _frameListeners.add(listener);
- }
-
- public void removeFrameListener(AMQMethodListener listener)
- {
- _frameListeners.remove(listener);
- }
- */
- public void attainState(AMQState s) throws AMQException
- {
- getStateManager().attainState(s);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
- public void writeFrame(AMQDataBlock frame)
- {
- _protocolSession.writeFrame(frame);
- }
-
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- _protocolSession.writeFrame(frame, wait);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
- * calling getProtocolSession().write() then waiting for the response.
- *
- * @param frame
- * @param listener the blocking listener. Note the calling thread will block.
- */
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener)
- throws AMQException
- {
- return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
- * calling getProtocolSession().write() then waiting for the response.
- *
- * @param frame
- * @param listener the blocking listener. Note the calling thread will block.
- */
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener, long timeout)
- throws AMQException
- {
- try
- {
- _frameListeners.add(listener);
- _protocolSession.writeFrame(frame);
-
- AMQMethodEvent e = listener.blockForFrame(timeout);
- return e;
- // When control resumes before this line, a reply will have been received
- // that matches the criteria defined in the blocking listener
- }
- catch (AMQException e)
- {
- throw e;
- }
- finally
- {
- // If we don't removeKey the listener then no-one will
- _frameListeners.remove(listener);
- }
-
- }
-
- /** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
- {
- return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
- }
-
- /** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
- {
- return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
- }
-
- /**
- * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
- * handler will ensure that messages are delivered to the consumer(s) on that session.
- *
- * @param channelId the channel id of the session
- * @param session the session instance.
- */
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- _protocolSession.addSessionByChannel(channelId, session);
- }
-
- /**
- * Convenience method to deregister an AMQSession with the protocol handler.
- *
- * @param channelId then channel id of the session
- */
- public void removeSessionByChannel(int channelId)
- {
- _protocolSession.removeSessionByChannel(channelId);
- }
-
- public void closeSession(AMQSession session) throws AMQException
- {
- _protocolSession.closeSession(session);
- }
-
- public void closeConnection() throws AMQException
- {
- closeConnection(-1);
- }
-
- public void closeConnection(long timeout) throws AMQException
- {
- getStateManager().changeState(AMQState.CONNECTION_CLOSING);
-
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
- _protocolSession.getProtocolMajorVersion(),
- _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection.")); // replyText
-
- try
- {
- syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _protocolSession.closeProtocolSession();
- }
- catch (AMQTimeoutException e)
- {
- _protocolSession.closeProtocolSession(false);
- }
-
-
- }
-
- /** @return the number of bytes read from this protocol session */
- public long getReadBytes()
- {
- return _protocolSession.getIoSession().getReadBytes();
- }
-
- /** @return the number of bytes written to this protocol session */
- public long getWrittenBytes()
- {
- return _protocolSession.getIoSession().getWrittenBytes();
- }
-
- public void failover(String host, int port)
- {
- _failoverHandler.setHost(host);
- _failoverHandler.setPort(port);
- // see javadoc for FailoverHandler to see rationale for separate thread
- startFailoverThread();
- }
-
- public void blockUntilNotFailingOver() throws InterruptedException
- {
- if (_failoverLatch != null)
- {
- _failoverLatch.await();
- }
- }
-
- public AMQShortString generateQueueName()
- {
- return _protocolSession.generateQueueName();
- }
-
- public CountDownLatch getFailoverLatch()
- {
- return _failoverLatch;
- }
-
- public void setFailoverLatch(CountDownLatch failoverLatch)
- {
- _failoverLatch = failoverLatch;
- }
-
- public AMQConnection getConnection()
- {
- return _connection;
- }
-
- public AMQStateManager getStateManager()
- {
- return _stateManager;
- }
+ void writeFrame(AMQDataBlock frame);
- public void setStateManager(AMQStateManager stateManager)
- {
- _stateManager = stateManager;
- _protocolSession.setStateManager(stateManager);
- }
+ void closeSession(AMQSession session) throws AMQException;
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
+ void closeConnection() throws AMQException;
- FailoverState getFailoverState()
- {
- return _failoverState;
- }
+ AMQConnection getConnection();
- public void setFailoverState(FailoverState failoverState)
- {
- _failoverState = failoverState;
- }
+ AMQStateManager getStateManager();
- public byte getProtocolMajorVersion()
- {
- return _protocolSession.getProtocolMajorVersion();
- }
+ AMQProtocolSession getProtocolSession();
+ ProtocolOutputHandler getOutputHandler();
- public byte getProtocolMinorVersion()
- {
- return _protocolSession.getProtocolMinorVersion();
- }
+ ProtocolVersion getProtocolVersion();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java
new file mode 100644
index 0000000000..738531d5a5
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java
@@ -0,0 +1,537 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.SSLConfiguration;
+import org.apache.qpid.client.failover.FailoverHandler;
+import org.apache.qpid.client.failover.FailoverState;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+
+public class AMQProtocolHandlerImpl extends IoHandlerAdapter implements AMQProtocolHandler
+{
+ private static final Logger _logger = Logger.getLogger(AMQProtocolHandlerImpl.class);
+
+ /**
+ * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
+ * and protocol handler instances.
+ */
+ private AMQConnection _connection;
+
+ /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
+ private volatile AMQProtocolSession _protocolSession;
+
+ private AMQStateManager _stateManager = new AMQStateManager();
+
+
+
+ /**
+ * We create the failover handler when the session is created since it needs a reference to the IoSession in order
+ * to be able to send errors during failover back to the client application. The session won't be available in the
+ * case where we failing over due to a Connection.Redirect message from the broker.
+ */
+ private FailoverHandler _failoverHandler;
+
+ /**
+ * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
+ * attempting failover where it is failing.
+ */
+ private FailoverState _failoverState = FailoverState.NOT_STARTED;
+
+ private CountDownLatch _failoverLatch;
+
+ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+ private static final int CONTROL_CHANNEL = 0;
+
+ public AMQProtocolHandlerImpl(AMQConnection con)
+ {
+ _connection = con;
+ }
+
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ _logger.debug("Protocol session created for session " + System.identityHashCode(session));
+ _failoverHandler = new FailoverHandler(this, session);
+
+ final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
+
+ if (Boolean.getBoolean("amqj.shared_read_write_pool"))
+ {
+ session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
+ }
+ else
+ {
+ session.getFilterChain().addLast("protocolFilter", pcf);
+ }
+ // we only add the SSL filter where we have an SSL connection
+ if (_connection.getSSLConfiguration() != null)
+ {
+ SSLConfiguration sslConfig = _connection.getSSLConfiguration();
+ SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
+ sslFilter.setUseClientMode(true);
+ session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
+ }
+
+
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+
+
+ _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+
+ // This starts the AMQP initiation by sending the AMQP Header
+ _protocolSession.init();
+ }
+
+ public void sessionOpened(IoSession session) throws Exception
+ {
+
+ }
+
+ /**
+ * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
+ * sessionClosed() depending on whether we were trying to send data at the time of failure.
+ *
+ * @param session
+ *
+ * @throws Exception
+ */
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ if (_connection.isClosed())
+ {
+ _logger.info("Session closed called by client");
+ }
+ else
+ {
+ _logger.info("Session closed called with failover state currently " + _failoverState);
+
+ //reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+ // known through the policy settings.
+
+ if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
+ {
+ _logger.info("FAILOVER STARTING");
+ if (_failoverState == FailoverState.NOT_STARTED)
+ {
+ _failoverState = FailoverState.IN_PROGRESS;
+ startFailoverThread();
+ }
+ else
+ {
+ _logger.info("Not starting failover as state currently " + _failoverState);
+ }
+ }
+ else
+ {
+ _logger.info("Failover not allowed by policy.");
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(_connection.getFailoverPolicy().toString());
+ }
+
+ if (_failoverState != FailoverState.IN_PROGRESS)
+ {
+ _logger.info("sessionClose() not allowed to failover");
+ _connection.exceptionReceived(
+ new AMQDisconnectedException("Server closed connection and reconnection " +
+ "not permitted."));
+ }
+ else
+ {
+ _logger.info("sessionClose() failover in progress");
+ }
+ }
+ }
+
+ _logger.info("Protocol Session [" + this + "] closed");
+ }
+
+ /** See {@link FailoverHandler} to see rationale for separate thread. */
+ private void startFailoverThread()
+ {
+ Thread failoverThread = new Thread(_failoverHandler);
+ failoverThread.setName("Failover");
+ // Do not inherit daemon-ness from current thread as this can be a daemon
+ // thread such as a AnonymousIoService thread.
+ failoverThread.setDaemon(false);
+ failoverThread.start();
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ {
+ _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ //write heartbeat frame:
+ _logger.debug("Sent heartbeat");
+ session.write(HeartbeatBody.FRAME);
+ HeartbeatDiagnostics.sent();
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ //failover:
+ HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ session.close();
+ }
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ {
+ if (_failoverState == FailoverState.NOT_STARTED)
+ {
+ //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
+ if (cause instanceof AMQConnectionClosedException)
+ {
+ _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
+ // this will attemp failover
+
+ sessionClosed(session);
+ }
+ }
+ // we reach this point if failover was attempted and failed therefore we need to let the calling app
+ // know since we cannot recover the situation
+ else if (_failoverState == FailoverState.FAILED)
+ {
+ _logger.error("Exception caught by protocol handler: " + cause, cause);
+ // we notify the state manager of the error in case we have any clients waiting on a state
+ // change. Those "waiters" will be interrupted and can handle the exception
+ AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ propagateExceptionToWaiters(amqe);
+ _connection.exceptionReceived(cause);
+ }
+ }
+
+ /**
+ * There are two cases where we have other threads potentially blocking for events to be handled by this class.
+ * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
+ * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
+ *
+ * @param e the exception to propagate
+ */
+ public void propagateExceptionToWaiters(Exception e)
+ {
+ getStateManager().error(e);
+ getProtocolSession().getOutputHandler().error(e);
+
+ }
+
+ private static int _messageReceivedCount;
+
+ public void messageReceived(IoSession session, Object message) throws Exception
+ {
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
+
+ if (debug && (msgNumber % 1000 == CONTROL_CHANNEL))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ switch (bodyFrame.getFrameType())
+ {
+ case AMQMethodBodyImpl.TYPE:
+
+ if (debug)
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+ }
+
+ final AMQMethodEvent<? extends AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+
+ try
+ {
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ wasAnyoneInterested = getProtocolSession().getOutputHandler().methodReceived(evt) || wasAnyoneInterested;
+
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ }
+ }
+ catch (AMQException e)
+ {
+ getStateManager().error(e);
+ getProtocolSession().getOutputHandler().error(e);
+ exceptionCaught(session, e);
+ }
+ break;
+
+ case ContentHeaderBody.TYPE:
+
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(),
+ (ContentHeaderBody) bodyFrame);
+ break;
+
+ case ContentBody.TYPE:
+
+ _protocolSession.messageContentBodyReceived(frame.getChannel(),
+ (ContentBody) bodyFrame);
+ break;
+
+ case HeartbeatBody.TYPE:
+
+ if (debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
+ break;
+
+ default:
+
+ }
+ _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ }
+
+ private static int _messagesOut;
+
+ public void messageSent(IoSession session, Object message) throws Exception
+ {
+ final long sentMessages = _messagesOut++;
+
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug && (sentMessages % 1000 == CONTROL_CHANNEL))
+ {
+ _logger.debug("Sent " + _messagesOut + " protocol messages");
+ }
+ _connection.bytesSent(session.getWrittenBytes());
+ if (debug)
+ {
+ _logger.debug("Sent frame " + message);
+ }
+ }
+
+ /*
+ public void addFrameListener(AMQMethodListener listener)
+ {
+ _frameListeners.add(listener);
+ }
+
+ public void removeFrameListener(AMQMethodListener listener)
+ {
+ _frameListeners.remove(listener);
+ }
+ */
+ public void attainState(AMQState s) throws AMQException
+ {
+ getStateManager().attainState(s);
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
+ *
+ * @param frame the frame to write
+ */
+ public void writeFrame(AMQDataBlock frame)
+ {
+ _protocolSession.writeFrame(frame);
+ }
+
+
+
+ /**
+ * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
+ * handler will ensure that messages are delivered to the consumer(s) on that session.
+ *
+ * @param channelId the channel id of the session
+ * @param session the session instance.
+ */
+ public void addSessionByChannel(int channelId, AMQSession session)
+ {
+ _protocolSession.addSessionByChannel(channelId, session);
+ }
+
+ /**
+ * Convenience method to deregister an AMQSession with the protocol handler.
+ *
+ * @param channelId then channel id of the session
+ */
+ public void removeSessionByChannel(int channelId)
+ {
+ _protocolSession.removeSessionByChannel(channelId);
+ }
+
+ public void closeSession(AMQSession session) throws AMQException
+ {
+ _protocolSession.closeSession(session);
+ }
+
+ public void closeConnection() throws AMQException
+ {
+ closeConnection(-1);
+ }
+
+ public void closeConnection(long timeout) throws AMQException
+ {
+ getStateManager().changeState(AMQState.CONNECTION_CLOSING);
+
+
+ try
+ {
+ ConnectionCloseBody closeBody = getAMQMethodFactory().createConnectionClose();
+ sendCommandReceiveResponse(CONTROL_CHANNEL,closeBody);
+
+
+ _protocolSession.closeProtocolSession();
+ }
+ catch (AMQTimeoutException e)
+ {
+ _protocolSession.closeProtocolSession(false);
+ }
+
+
+ }
+
+ private void sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+ {
+ getOutputHandler().sendCommandReceiveResponse(channelId, command);
+ }
+
+ private AMQMethodFactory getAMQMethodFactory()
+ {
+ return getOutputHandler().getAMQMethodFactory();
+ }
+
+ /** @return the number of bytes read from this protocol session */
+ public long getReadBytes()
+ {
+ return _protocolSession.getIoSession().getReadBytes();
+ }
+
+ /** @return the number of bytes written to this protocol session */
+ public long getWrittenBytes()
+ {
+ return _protocolSession.getIoSession().getWrittenBytes();
+ }
+
+ public void failover(String host, int port)
+ {
+ _failoverHandler.setHost(host);
+ _failoverHandler.setPort(port);
+ // see javadoc for FailoverHandler to see rationale for separate thread
+ startFailoverThread();
+ }
+
+ public void blockUntilNotFailingOver() throws InterruptedException
+ {
+ if (_failoverLatch != null)
+ {
+ _failoverLatch.await();
+ }
+ }
+
+ public AMQShortString generateQueueName()
+ {
+ return _protocolSession.generateQueueName();
+ }
+
+ public CountDownLatch getFailoverLatch()
+ {
+ return _failoverLatch;
+ }
+
+ public void setFailoverLatch(CountDownLatch failoverLatch)
+ {
+ _failoverLatch = failoverLatch;
+ }
+
+ public AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public void setStateManager(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ _protocolSession.setStateManager(stateManager);
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public ProtocolOutputHandler getOutputHandler()
+ {
+ return getProtocolSession().getOutputHandler();
+ }
+
+ FailoverState getFailoverState()
+ {
+ return _failoverState;
+ }
+
+ public void setFailoverState(FailoverState failoverState)
+ {
+ _failoverState = failoverState;
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolSession.getProtocolVersion();
+ }
+
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 055109d3be..3c158415e0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -43,10 +43,11 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MainRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.MainRegistry;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.AMQConstant;
@@ -56,7 +57,7 @@ import org.apache.qpid.protocol.AMQConstant;
* The underlying protocol session is still available but clients should not
* use it to obtain session attributes.
*/
-public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession
+public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
@@ -81,7 +82,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
* The handler from which this session was created and which is used to handle protocol events.
* We send failover events to the handler.
*/
- protected final AMQProtocolHandler _protocolHandler;
+ protected final AMQProtocolHandlerImpl _protocolHandler;
/**
* Maps from the channel id to the AMQSession that it represents.
@@ -102,9 +103,10 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
- private byte _protocolMinorVersion;
- private byte _protocolMajorVersion;
- private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
+ private MethodRegistry _registry = MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
+ private ProtocolVersion _protocolVersion;
+ private ProtocolOutputHandler _outputHandler = ProtocolOutputHandlerFactory.createOutputHandler(ProtocolVersion.getLatestSupportedVersion(), this);
+ ;
/**
@@ -118,7 +120,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
_stateManager = new AMQStateManager(this);
}
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+ public AMQProtocolSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection)
{
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
@@ -129,7 +131,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
_stateManager = new AMQStateManager(this);
}
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+ public AMQProtocolSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
{
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
@@ -147,11 +149,8 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be used
- here. */
- int i = pv.length - 1;
- _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+
+ _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
}
public String getClientID()
@@ -474,26 +473,27 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
session.confirmConsumerCancelled(consumerTag);
}
- public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
+ public void setProtocolVersion(ProtocolVersion pv)
{
- _protocolMajorVersion = versionMajor;
- _protocolMinorVersion = versionMinor;
- _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ _protocolVersion = pv;
+
+ _registry = MethodRegistry.getMethodRegistry(pv);
}
- public byte getProtocolMinorVersion()
+ public ProtocolVersion getProtocolVersion()
{
- return _protocolMinorVersion;
+ return _protocolVersion;
}
- public byte getProtocolMajorVersion()
+ public MethodRegistry getRegistry()
{
- return _protocolMajorVersion;
+ return _registry;
}
- public VersionSpecificRegistry getRegistry()
+ public ProtocolOutputHandler getOutputHandler()
{
- return _registry;
+ return _outputHandler;
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 85f98eab69..8f1f410a15 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -23,11 +23,12 @@ package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-public abstract class BlockingMethodFrameListener implements AMQMethodListener
+public abstract class BlockingMethodFrameListener<T extends AMQMethodBody> implements AMQMethodListener
{
private volatile boolean _ready = false;
@@ -43,7 +44,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
protected int _channelId;
- protected AMQMethodEvent _doneEvt = null;
+ protected AMQMethodEvent<T> _doneEvt = null;
public BlockingMethodFrameListener(int channelId)
{
@@ -91,7 +92,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
/**
* This method is called by the thread that wants to wait for a frame.
*/
- public AMQMethodEvent blockForFrame(long timeout) throws AMQException
+ public AMQMethodEvent<T> blockForFrame(long timeout) throws AMQException
{
synchronized (_lock)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java
new file mode 100644
index 0000000000..3db6232d41
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Map;
+import java.util.HashMap;
+
+
+public interface ProtocolOutputHandler
+{
+
+ void sendCommand(int channelId, AMQMethodBody command);
+
+ AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException;
+ AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command, long timeout) throws AMQException;
+ <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass, long timeout) throws AMQException;
+ <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass) throws AMQException;
+
+ AMQMethodFactory getAMQMethodFactory();
+
+ void publishMessage(int channelId, AMQShortString exchangeName, AMQShortString routingKey, boolean immediate, boolean mandatory, ByteBuffer payload, CommonContentHeaderProperties contentHeaderProperties, int ticket);
+
+ <M extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<M> evt) throws Exception;
+
+ void error(Exception e);
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java
new file mode 100644
index 0000000000..6cc0d8d3b5
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.client.protocol.amqp_8_0.ProtocolOutputHandler_8_0;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public abstract class ProtocolOutputHandlerFactory
+{
+ private static final Map<ProtocolVersion, ProtocolOutputHandlerFactory> _handlers =
+ new HashMap<ProtocolVersion, ProtocolOutputHandlerFactory>();
+
+ public ProtocolOutputHandlerFactory(ProtocolVersion pv)
+ {
+ _handlers.put(pv,this);
+ }
+
+ public abstract ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession);
+
+ public static ProtocolOutputHandler createOutputHandler(ProtocolVersion version, AMQProtocolSession amqProtocolSession)
+ {
+ return _handlers.get(version).newInstance(amqProtocolSession);
+ }
+
+ private static final ProtocolOutputHandlerFactory VERSION_8_0 =
+ new ProtocolOutputHandlerFactory(new ProtocolVersion((byte)8,(byte)0))
+ {
+
+ public ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession)
+ {
+ return new ProtocolOutputHandler_8_0(amqProtocolSession);
+ }
+ };
+
+ // TODO - HACK
+
+ private static final ProtocolOutputHandlerFactory VERSION_0_9 =
+ new ProtocolOutputHandlerFactory(new ProtocolVersion((byte)0,(byte)9))
+ {
+
+ public ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession)
+ {
+ return new ProtocolOutputHandler_8_0(amqProtocolSession);
+ }
+ };
+
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java b/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java
new file mode 100644
index 0000000000..3654f46c1a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.protocol.amqp_8_0;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.*;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class ProtocolOutputHandler_8_0 implements ProtocolOutputHandler
+{
+ private static final AMQMethodFactory METHOD_FACTORY = new AMQMethodFactory_8_0() ;
+
+
+ private static final Map<Class<? extends AMQMethodBody>, Class<? extends AMQMethodBody>> REQUSET_RESPONSE_METHODBODY_MAP =
+ new HashMap<Class<? extends AMQMethodBody>, Class<? extends AMQMethodBody>>();
+
+ static
+ {
+ // Basic Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicCancelBody.class, BasicCancelOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicConsumeBody.class, BasicConsumeOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicQosBody.class, BasicQosOkBody.class);
+ // GET ???
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicRecoverBody.class, BasicRecoverOkBody.class);
+
+ // Channel Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelCloseBody.class, ChannelCloseOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelFlowBody.class, ChannelFlowOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelOpenBody.class, ChannelOpenOkBody.class);
+
+ // Connection Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionOpenBody.class, ConnectionOpenOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionSecureBody.class, ConnectionSecureOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionStartBody.class, ConnectionStartOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionTuneBody.class, ConnectionTuneOkBody.class);
+
+ // Exchange Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeBoundBody.class, ExchangeBoundOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeDeclareBody.class, ExchangeDeclareOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeDeleteBody.class, ExchangeDeleteOkBody.class);
+
+ // Queue Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueBindBody.class, QueueBindOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueDeclareBody.class, QueueDeclareOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueDeleteBody.class, QueueDeleteOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueuePurgeBody.class, QueuePurgeOkBody.class);
+
+ // Tx Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxCommitBody.class, TxCommitOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxRollbackBody.class, TxRollbackOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxSelectBody.class, TxSelectOkBody.class);
+
+ }
+
+
+
+
+
+ private final AMQProtocolSession _session;
+ private static final long DEFAULT_TIMEOUT = 30000;
+ private final CopyOnWriteArraySet<SpecificMethodFrameListener> _frameListeners =
+ new CopyOnWriteArraySet<SpecificMethodFrameListener>();
+
+ public ProtocolOutputHandler_8_0(AMQProtocolSession amqProtocolSession)
+ {
+ _session = amqProtocolSession;
+ }
+
+
+
+
+ private void writeFrame(AMQDataBlock frame)
+ {
+ _session.writeFrame(frame);
+ }
+
+ public void sendCommand(int channelId, AMQMethodBody command)
+ {
+ _session.writeFrame(new AMQFrame(channelId,command));
+ }
+
+ public AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, REQUSET_RESPONSE_METHODBODY_MAP.get(command.getClass()));
+ }
+
+ public AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command, long timeout) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, REQUSET_RESPONSE_METHODBODY_MAP.get(command.getClass()), timeout);
+ }
+
+ public <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass, long timeout) throws AMQException
+ {
+ AMQFrame frame = new AMQFrame(channelId,command);
+ return writeCommandFrameAndWaitForReply(frame,
+ new SpecificMethodFrameListener<T>(channelId, responseClass), timeout);
+ }
+
+ private <T extends AMQMethodBody> T writeCommandFrameAndWaitForReply(AMQFrame frame, SpecificMethodFrameListener<T> listener, long timeout) throws AMQException
+ {
+ try
+ {
+ _frameListeners.add(listener);
+ _session.writeFrame(frame);
+
+ AMQMethodEvent<T> e = listener.blockForFrame(timeout);
+ return e.getMethod();
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
+ }
+ finally
+ {
+ // If we don't removeKey the listener then no-one will
+ _frameListeners.remove(listener);
+ }
+
+
+ }
+
+ public <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, responseClass, DEFAULT_TIMEOUT);
+ }
+
+ public AMQMethodFactory getAMQMethodFactory()
+ {
+ return METHOD_FACTORY;
+ }
+
+
+ public void publishMessage(int channelId, AMQShortString exchangeName, AMQShortString routingKey, boolean immediate, boolean mandatory, ByteBuffer payload, CommonContentHeaderProperties contentHeaderProperties, int ticket)
+ {
+ final int size = (payload != null) ? payload.limit() : 0;
+ BasicPublishBodyImpl publishBody = new BasicPublishBodyImpl(ticket, exchangeName, routingKey, mandatory, immediate);
+
+
+ final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+ final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+ if (payload != null)
+ {
+ createContentBodies(payload, frames, 2, channelId);
+ }
+
+
+
+ AMQFrame contentHeaderFrame =
+ ContentHeaderBody.createAMQFrame(channelId,
+ publishBody.CLASS_ID,
+ 0, // weight
+ contentHeaderProperties,
+ size);
+
+ frames[0] = new AMQFrame(channelId,publishBody);
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ writeFrame(compositeFrame);
+ }
+
+ public <M extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<M> evt) throws AMQException
+ {
+ boolean wasAnyoneInterested = false;
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator<SpecificMethodFrameListener> it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final SpecificMethodFrameListener listener = it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+
+ return wasAnyoneInterested;
+ }
+
+ public void error(Exception e)
+ {
+ if (!_frameListeners.isEmpty())
+ {
+ final Iterator<SpecificMethodFrameListener> it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final SpecificMethodFrameListener ml = it.next();
+ ml.error(e);
+ }
+ }
+ }
+
+
+ /**
+ * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ * maximum frame size.
+ *
+ * @param payload
+ * @param frames
+ * @param offset
+ * @param channelId @return the array of content bodies
+ */
+ private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
+ {
+
+ if (frames.length == (offset + 1))
+ {
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
+ }
+ else
+ {
+
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ long remaining = payload.remaining();
+ for (int i = offset; i < frames.length; i++)
+ {
+ payload.position((int) framePayloadMax * (i - offset));
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+ payload.limit(payload.position() + length);
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
+
+ remaining -= length;
+ }
+ }
+
+ }
+
+ private int calculateContentBodyFrameCount(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ((payload == null) || (payload.remaining() == 0))
+ {
+ frameCount = 0;
+ }
+ else
+ {
+ int dataLength = payload.remaining();
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 825baf95d1..bba1c2701c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -27,34 +27,20 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.handler.BasicCancelOkMethodHandler;
-import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
-import org.apache.qpid.client.handler.BasicReturnMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
-import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler;
-import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.BasicCancelOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.BasicDeliverMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ChannelCloseOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ChannelFlowOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionOpenOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ExchangeBoundOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.QueueDeleteOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.BasicReturnMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.*;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -62,7 +48,7 @@ import org.apache.qpid.protocol.AMQMethodListener;
* The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
* there is a separate state manager.
*/
-public class AMQStateManager implements AMQMethodListener
+public class AMQStateManager
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
private AMQProtocolSession _protocolSession;
@@ -178,7 +164,7 @@ public class AMQStateManager implements AMQMethodListener
StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, _protocolSession, evt);
+ handler.methodReceived(this, evt);
return true;
}
return false;
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
index b3932533ce..9ddc50941b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
@@ -31,6 +31,6 @@ import org.apache.qpid.protocol.AMQMethodEvent;
*/
public interface StateAwareMethodListener
{
- void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession,
- AMQMethodEvent evt) throws AMQException;
+ void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException;
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
index 1c70ded62a..8a19a77776 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
@@ -22,13 +22,14 @@ package org.apache.qpid.client.state.listener;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
-public class SpecificMethodFrameListener extends BlockingMethodFrameListener
+public class SpecificMethodFrameListener<T extends AMQMethodBody> extends BlockingMethodFrameListener
{
private final Class _expectedClass;
- public SpecificMethodFrameListener(int channelId, Class expectedClass)
+ public SpecificMethodFrameListener(int channelId, Class<T> expectedClass)
{
super(channelId);
_expectedClass = expectedClass;
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java
index 7a24d6e15a..9877cd3c37 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java
@@ -22,11 +22,11 @@ package org.apache.qpid.client.transport;
import java.io.IOException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.jms.BrokerDetails;
public interface ITransportConnection
{
- void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
+ void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail)
throws IOException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 04e7e40564..25d5a2cc1c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -30,7 +30,7 @@ import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -50,7 +50,7 @@ public class SocketTransportConnection implements ITransportConnection
_socketConnectorFactory = socketConnectorFactory;
}
- public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
+ public void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail)
throws IOException
{
ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index 104c4b43d0..4f8126f070 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -27,7 +27,7 @@ import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.PoolingFilter;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
@@ -43,7 +43,7 @@ public class VmPipeTransportConnection implements ITransportConnection
_port = port;
}
- public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
+ public void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new VmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();