diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-03-22 13:14:42 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-03-22 13:14:42 +0000 |
commit | 3fb9be28593263e12623ce09084a230b59b81f4f (patch) | |
tree | 8de74dd781802819df0ff1ca56aaa94fa1b9b38e /java/client/src/main | |
parent | b9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff) | |
download | qpid-python-3fb9be28593263e12623ce09084a230b59b81f4f.tar.gz |
made a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
39 files changed, 1459 insertions, 1224 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 413524b6d8..1b4ae02399 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -25,7 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverSupport; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; +import org.apache.qpid.client.protocol.ProtocolOutputHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; @@ -36,6 +37,8 @@ import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.framing.AMQMethodFactory; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Connection; @@ -92,7 +95,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate * handler. */ - private AMQProtocolHandler _protocolHandler; + private AMQProtocolHandlerImpl _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap @@ -273,7 +276,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _failoverPolicy = new FailoverPolicy(connectionURL); - _protocolHandler = new AMQProtocolHandler(this); + _protocolHandler = new AMQProtocolHandlerImpl(this); // We are not currently connected _connected = false; @@ -550,26 +553,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { + // define this here, should be poassed in + final int prefetchSize = 0; - // TODO: Be aware of possible changes to parameter order as versions change. + AMQMethodFactory methodFactory = getAMQMethodFactory(); - _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); - - //todo send low water mark when protocol allows. - //todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - false, // global - prefetchHigh, // prefetchCount - 0), // prefetchSize - BasicQosOkBody.class); + ChannelOpenBody openBody = methodFactory.createChannelOpen(); + sendCommandReceiveResponse(channelId, openBody); + AMQMethodBody qosBody = methodFactory.createMessageQos(prefetchHigh, prefetchSize); + sendCommandReceiveResponse(channelId, qosBody); if (transacted) { @@ -578,14 +570,21 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.debug("Issuing TxSelect for " + channelId); } - // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), - TxSelectOkBody.class); + TxSelectBody txSelectBody = methodFactory.createTxSelect(); + sendCommandReceiveResponse(channelId, txSelectBody); } } + private AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException + { + return getProtocolOutputHandler().sendCommandReceiveResponse(channelId, command); + } + + private AMQMethodFactory getAMQMethodFactory() + { + return getProtocolOutputHandler().getAMQMethodFactory(); + } + private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { try @@ -934,7 +933,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _virtualHost; } - public AMQProtocolHandler getProtocolHandler() + public AMQProtocolHandlerImpl getProtocolHandler() { return _protocolHandler; } @@ -1218,4 +1217,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _taskPool.execute(task); } + + public ProtocolOutputHandler getProtocolOutputHandler() + { + return _protocolHandler.getOutputHandler(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 89f596e541..41101ff374 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -59,6 +59,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; @@ -68,41 +69,12 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.protocol.BlockingMethodFrameListener; +import org.apache.qpid.client.protocol.ProtocolOutputHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AccessRequestBody; -import org.apache.qpid.framing.AccessRequestOkBody; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ExchangeBoundBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.TxCommitBody; -import org.apache.qpid.framing.TxCommitOkBody; -import org.apache.qpid.framing.TxRollbackBody; -import org.apache.qpid.framing.TxRollbackOkBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -197,6 +169,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _suspended; private final Object _suspensionLock = new Object(); + private static final AMQShortString CHANNEL_CLOSE_REPLY_TEXT = new AMQShortString("JMS client closing channel"); /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -271,11 +244,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (message.getDeliverBody() != null) { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag()); if (consumer == null) { - _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring..."); + _logger.warn("Received a message from queue " + message.getDeliverBody().getConsumerTag() + " without a handler - ignoring..."); _logger.warn("Consumers that exist: " + _consumers); _logger.warn("Session hashcode: " + System.identityHashCode(this)); } @@ -513,14 +486,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi i.next().acknowledgeLastDelivered(); } - // Commits outstanding messages sent and outstanding acknowledgements. - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQProtocolHandler handler = getProtocolHandler(); + TxCommitBody commitBody = getAMQMethodFactory().createTxCommit(); + sendCommandReceiveResponse(commitBody); + + - handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion()), - TxCommitOkBody.class); } catch (AMQException e) { @@ -549,8 +519,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi suspendChannel(true); } - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + TxRollbackBody rollbackBody = getAMQMethodFactory().createTxRollback(); + sendCommandReceiveResponse(rollbackBody); if (_dispatcher != null) { @@ -590,15 +560,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { getProtocolHandler().closeSession(this); - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText - - getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + ChannelCloseBody closeBody = getAMQMethodFactory().createChannelClose(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + CHANNEL_CLOSE_REPLY_TEXT); + sendCommandReceiveResponse(closeBody, timeout); + + + // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -617,21 +584,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private AMQProtocolHandler getProtocolHandler() + private AMQProtocolHandlerImpl getProtocolHandler() { return _connection.getProtocolHandler(); } - - private byte getProtocolMinorVersion() + public ProtocolOutputHandler getProtocolOutputHandler() { - return getProtocolHandler().getProtocolMinorVersion(); + return _connection.getProtocolOutputHandler(); } - private byte getProtocolMajorVersion() - { - return getProtocolHandler().getProtocolMajorVersion(); - } + /** @@ -835,14 +798,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.clearUnackedMessages(); } - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false) // requeue - , BasicRecoverOkBody.class); + final boolean requeue = false; + AMQMethodBody recoverBody = getAMQMethodFactory().createRecover(requeue); + sendCommandReceiveResponse(recoverBody); + + + if (_dispatcher != null) { @@ -1226,136 +1187,60 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException { - declareExchange(name, type, getProtocolHandler()); + ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket()); + sendCommandReceiveResponse(exchangeDeclare); } public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - getTicket(), // ticket - type); // type - getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); - } + ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket()); + sendCommandReceiveResponse(exchangeDeclare); - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException - { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); - } - - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException - { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - getTicket(), // ticket - type); // type - - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException { - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - autoDelete, // autoDelete - durable, // durable - exclusive, // exclusive - false, // nowait - false, // passive - name, // queue - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); - + QueueDeclareBody queueDeclare = getAMQMethodFactory().createQueueDeclare(name, null, autoDelete, durable, exclusive, false ,getTicket()); + sendCommandReceiveResponse(queueDeclare); } public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - exchangeName, // exchange - false, // nowait - queueName, // queue - routingKey, // routingKey - getTicket()); // ticket - - - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + QueueBindBody queueBind = getAMQMethodFactory().createQueueBind(queueName,exchangeName,routingKey,arguments,getTicket()); + sendCommandReceiveResponse(queueBind); } /** * Declare the queue. * * @param amqd - * @param protocolHandler * * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. * * @throws AMQException */ - private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException + private AMQShortString declareQueue(AMQDestination amqd) throws AMQException { // For queues (but not topics) we generate the name in the client rather than the // server. This allows the name to be reused on failover if required. In general, // the destination indicates whether it wants a name generated or not. if (amqd.isNameRequired()) { - amqd.setQueueName(protocolHandler.generateQueueName()); + amqd.setQueueName(getProtocolHandler().generateQueueName()); } //TODO verify the destiation is valid. else throw - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - false, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - getTicket()); // ticket - - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + createQueue(amqd.getAMQQueueName(),amqd.isAutoDelete(),amqd.isDurable(),amqd.isExclusive()); + + return amqd.getAMQQueueName(); } - private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException + private void bindQueue(AMQDestination amqd, AMQShortString queueName, FieldTable ft) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - ft, // arguments - amqd.getExchangeName(), // exchange - false, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - getTicket()); // ticket - - - protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); + bindQueue(queueName,amqd.getRoutingKey(),ft,amqd.getExchangeName()); } /** @@ -1365,8 +1250,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @return the consumer tag generated by the broker */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector) throws AMQException + private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, boolean nowait, String messageSelector) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag @@ -1392,25 +1276,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - getTicket()); // ticket - if (nowait) + final boolean noAck = consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE; + + AMQMethodBody consumeBody = getAMQMethodFactory().createConsumer(tag, + queueName, + arguments, + noAck, + consumer.isExclusive(), + consumer.isNoLocal(), + getTicket()); + if(nowait) { - protocolHandler.writeFrame(jmsConsume); + sendCommand(consumeBody); } else { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + sendCommandReceiveResponse(consumeBody); } + } catch (AMQException e) { @@ -1606,15 +1489,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { try { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - getTicket()); // ticket - getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + + QueueDeleteBody deleteBody = getAMQMethodFactory().createQueueDelete(queueName, false, false, getTicket()); + sendCommandReceiveResponse(deleteBody); } catch (AMQException e) { @@ -1697,23 +1574,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - exchangeName, // exchange - queueName, // queue - routingKey); // routingKey AMQMethodEvent response = null; try { - response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + ExchangeBoundBody exchangeBoundBody = + getAMQMethodFactory().createExchangeBound(exchangeName, // exchange + queueName, // queue + routingKey); // routingKey + + + ExchangeBoundOkBody responseBody = sendCommandReceiveResponse(exchangeBoundBody, ExchangeBoundOkBody.class); + return (responseBody.getReplyCode() == 0); } catch (AMQException e) { throw new JMSAMQException(e); } - ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); - return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency + } private void checkTransacted() throws JMSException @@ -1770,13 +1647,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, - message.getBounceBody().exchange, - message.getBounceBody().routingKey, + message.getBounceBody().getExchange(), + message.getBounceBody().getRoutingKey(), message.getContentHeader(), message.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().getReplyCode()); + AMQShortString reason = message.getBounceBody().getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. @@ -1812,16 +1689,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple + AMQMethodBody ackBody = getAMQMethodFactory().createAcknowledge(deliveryTag, multiple); if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } - getProtocolHandler().writeFrame(ackFrame); + sendCommand(ackBody); } public int getDefaultPrefetch() @@ -1908,17 +1781,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { AMQDestination amqd = consumer.getDestination(); - AMQProtocolHandler protocolHandler = getProtocolHandler(); - - declareExchange(amqd, protocolHandler); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass()); - AMQShortString queueName = declareQueue(amqd, protocolHandler); + AMQShortString queueName = declareQueue(amqd); - bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); + bindQueue(amqd, queueName, consumer.getRawSelectorFieldTable()); try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); + consumeFromQueue(consumer, queueName, nowait, consumer.getMessageSelector()); } catch (JMSException e) //thrown by getMessageSelector { @@ -2019,14 +1890,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _suspended = suspend; - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - !suspend); // active + AMQMethodBody flowBody = getAMQMethodFactory().createChannelFlow(!suspend); + sendCommandReceiveResponse(flowBody); - _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); } } @@ -2118,32 +1984,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException { - getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), - getProtocolMinorVersion(), - active, - exclusive, - passive, - read, - realm, - write), - new BlockingMethodFrameListener(_channelId) - { - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException - { - if (frame instanceof AccessRequestOkBody) - { - setTicket(((AccessRequestOkBody) frame).getTicket()); - return true; - } - else - { - return false; - } - } - }); + AccessRequestBody accessRequest = getAMQMethodFactory().createAccessRequest(active,exclusive,passive,read,realm,write); + AccessRequestOkBody okBody = getProtocolOutputHandler().sendCommandReceiveResponse(_channelId,accessRequest, AccessRequestOkBody.class ); + setTicket(okBody.getTicket()); + } + AMQMethodFactory getAMQMethodFactory() + { + return getProtocolOutputHandler().getAMQMethodFactory(); } private class SuspenderRunner implements Runnable @@ -2187,7 +2036,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) + if (consumerTag == null || message.getDeliverBody().getConsumerTag().equals(consumerTag)) { if (_logger.isTraceEnabled()) { @@ -2196,7 +2045,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messages.remove(); - rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue); _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); } @@ -2209,13 +2058,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(long deliveryTag, boolean requeue) { - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); + AMQMethodBody rejectBody = getAMQMethodFactory().createRejectBody(deliveryTag, requeue); + sendCommand(rejectBody); + + } - _connection.getProtocolHandler().writeFrame(basicRejectBody); + <T extends AMQMethodBody> T sendCommandReceiveResponse(AMQMethodBody command, Class<T> responseClass) throws AMQException + { + return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, responseClass); + } + AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException + { + return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command); + } + AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command, long timeout) throws AMQException + { + return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, timeout); } + + void sendCommand(AMQMethodBody command) + { + getProtocolOutputHandler().sendCommand(_channelId, command); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e9b914425a..f0bea6cc90 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,6 +33,7 @@ import javax.jms.MessageListener; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; @@ -42,6 +43,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicCancelBody; import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQMethodFactory; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; @@ -438,16 +441,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (sendClose) { - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - _consumerTag, // consumerTag - false); // nowait - try { - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + AMQMethodBody cancelBody = getAMQMethodFactory().createConsumerCancel(_consumerTag); + sendCommandReceiveResponse(cancelBody); } catch (AMQException e) { @@ -467,6 +464,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody cancelBody) throws AMQException + { + return getSession().sendCommandReceiveResponse(cancelBody); + } + + private AMQMethodFactory getAMQMethodFactory() + { + return getSession().getAMQMethodFactory(); + } + /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has * vetoed automatic resubscription. The caller must hold the failover mutex. @@ -490,14 +497,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (debug) { - _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag); + _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().getDeliveryTag()); } try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, - messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, + AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(), + messageFrame.getDeliverBody().getRedelivered(), + messageFrame.getDeliverBody().getExchange(), + messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index b01e087ce1..43b9f3569a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -38,17 +38,11 @@ import javax.jms.Topic; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.CompositeAMQDataBlock; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; +import org.apache.qpid.framing.*; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -91,7 +85,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j */ private String _mimeType; - private AMQProtocolHandler _protocolHandler; + private AMQProtocolHandlerImpl _protocolHandler; /** * True if this producer was created from a transacted session @@ -121,7 +115,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, + AMQSession session, AMQProtocolHandlerImpl protocolHandler, long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent) { _connection = connection; @@ -152,20 +146,28 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void declareDestination(AMQDestination destination) { // Declare the exchange - // Note that the durable and internal arguments are ignored since passive is set to false - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame declare = - ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), null, // arguments - false, // autoDelete - false, // durable - destination.getExchangeName(), // exchange - false, // internal - true, // nowait - false, // passive - _session.getTicket(), // ticket - destination.getExchangeClass()); // type - _protocolHandler.writeFrame(declare); + + ExchangeDeclareBody exchangeDeclareBody = + getAMQMethodFactory().createExchangeDeclare(destination.getExchangeName(), + destination.getExchangeClass(), + _session.getTicket()); + sendCommand(exchangeDeclareBody); + + } + + private void sendCommand(AMQMethodBody command) + { + getSession().sendCommand(command); + } + + private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException + { + return getSession().sendCommandReceiveResponse(command); + } + + private AMQMethodFactory getAMQMethodFactory() + { + return getSession().getAMQMethodFactory(); } public void setDisableMessageID(boolean b) throws JMSException @@ -467,21 +469,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame publishFrame = - BasicPublishBody.createAMQFrame( - _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(), - destination.getExchangeName(), // exchange - immediate, // immediate - mandatory, // mandatory - destination.getRoutingKey(), // routingKey - _session.getTicket()); // ticket - message.prepareForSending(); ByteBuffer payload = message.getData(); - BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); + CommonContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); if (!_disableTimestamps) { @@ -501,37 +491,15 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); - final int size = (payload != null) ? payload.limit() : 0; - final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); - final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; + getSession().getProtocolOutputHandler().publishMessage(getSession().getChannelId(), + destination.getExchangeName(), + destination.getRoutingKey(), + immediate, + mandatory, + payload, + contentHeaderProperties, + getSession().getTicket()); - if (payload != null) - { - createContentBodies(payload, frames, 2, _channelId); - } - - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) - { - _logger.debug("Sending content body frames to " + destination); - } - - // weight argument of zero indicates no child content headers, just bodies - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, - BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), 0, - contentHeaderProperties, size); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending content header frame to " + destination); - } - - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - _protocolHandler.writeFrame(compositeFrame, wait); if (message != origMessage) { @@ -544,6 +512,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } + + private void checkTemporaryDestination(AMQDestination destination) throws JMSException { if (destination instanceof TemporaryDestination) @@ -564,59 +534,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - /** - * Create content bodies. This will split a large message into numerous bodies depending on the negotiated - * maximum frame size. - * - * @param payload - * @param frames - * @param offset - * @param channelId @return the array of content bodies - */ - private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) - { - - if (frames.length == (offset + 1)) - { - frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); - } - else - { - - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - long remaining = payload.remaining(); - for (int i = offset; i < frames.length; i++) - { - payload.position((int) framePayloadMax * (i - offset)); - int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; - payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); - - remaining -= length; - } - } - - } - - private int calculateContentBodyFrameCount(ByteBuffer payload) - { - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - int frameCount; - if ((payload == null) || (payload.remaining() == 0)) - { - frameCount = 0; - } - else - { - int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; - frameCount = (int) (dataLength / framePayloadMax) + lastFrame; - } - - return frameCount; - } public void setMimeType(String mimeType) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 844ecbe743..268456290e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import org.apache.mina.common.IoSession; import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.client.state.AMQStateManager; /** @@ -42,7 +42,7 @@ public class FailoverHandler implements Runnable private static final Logger _logger = Logger.getLogger(FailoverHandler.class); private final IoSession _session; - private AMQProtocolHandler _amqProtocolHandler; + private AMQProtocolHandlerImpl _amqProtocolHandler; /** * Used where forcing the failover host @@ -54,7 +54,7 @@ public class FailoverHandler implements Runnable */ private int _port; - public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) + public FailoverHandler(AMQProtocolHandlerImpl amqProtocolHandler, IoSession session) { _amqProtocolHandler = amqProtocolHandler; _session = session; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java new file mode 100644 index 0000000000..994e58ed03 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java @@ -0,0 +1,24 @@ +package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.amqp_0_9.ChannelCloseOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ChannelCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ChannelCloseMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class);
+
+ private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler();
+
+ public static ChannelCloseMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+
+ protected ChannelCloseOkBody createChannelCloseOkBody()
+ {
+ return new ChannelCloseOkBodyImpl();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java new file mode 100644 index 0000000000..b9b7074fd2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java @@ -0,0 +1,29 @@ +package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.amqp_0_9.ConnectionCloseOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ConnectionCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
+
+ private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler();
+
+ public static ConnectionCloseMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ protected ConnectionCloseMethodHandler()
+ {
+ }
+
+
+ protected ConnectionCloseOkBody createConnectionCloseOkBody()
+ {
+ return new ConnectionCloseOkBodyImpl();
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java new file mode 100644 index 0000000000..859fd2b63b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java @@ -0,0 +1,28 @@ +package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+public class ConnectionSecureMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler
+{
+ private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
+
+ public static ConnectionSecureMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response)
+ {
+ return new ConnectionSecureOkBodyImpl(response);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java new file mode 100644 index 0000000000..e28619b378 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java @@ -0,0 +1,41 @@ +package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.amqp_0_9.ConnectionOpenBodyImpl;
+import org.apache.qpid.framing.amqp_0_9.ConnectionTuneOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ConnectionTuneMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class);
+
+ private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler();
+
+ public static ConnectionTuneMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+
+ protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist)
+ {
+
+ return new ConnectionOpenBodyImpl(path,// virtualHost
+ capabilities, // capabilities
+ insist); // insist
+
+ }
+
+ protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params)
+ {
+ // Be aware of possible changes to parameter order as versions change.
+ return new ConnectionTuneOkBodyImpl(
+ params.getChannelMax(), // channelMax
+ params.getFrameMax(), // frameMax
+ params.getHeartbeat()); // heartbeat
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java index 9bd0205977..2acf3005f1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -45,10 +45,12 @@ public class BasicCancelOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { + _logger.debug("New BasicCancelOk method received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.getConsumerTag()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java index d34d6688c1..47586db2f2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -40,8 +40,9 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener return _instance; } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod()); _logger.debug("New JmsDeliver method received"); protocolSession.unprocessedMessageReceived(msg); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java index 02573c5d00..ad5a1331b0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -40,9 +40,10 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener return _instance; } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { _logger.debug("New JmsBounce method received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod()); protocolSession.unprocessedMessageReceived(msg); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java index e2b101ab79..edfd8e5110 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelClosedException; @@ -29,10 +29,10 @@ import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.amqp_8_0.ChannelCloseOkBodyImpl; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -47,21 +47,23 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener return _handler; } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { _logger.debug("ChannelClose method received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); - AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); - AMQShortString reason = method.replyText; + AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); + AMQShortString reason = method.getReplyText(); if (_logger.isDebugEnabled()) { _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); } - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor()); - protocolSession.writeFrame(frame); + protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), createChannelCloseOkBody()); + + + if (errorCode != AMQConstant.REPLY_SUCCESS) { if (_logger.isDebugEnabled()) @@ -96,4 +98,9 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener } protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); } + + protected ChannelCloseOkBody createChannelCloseOkBody() + { + return new ChannelCloseOkBodyImpl(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java index 794071cc34..d51f1d9c41 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java @@ -18,11 +18,10 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.protocol.AMQMethodEvent; @@ -38,7 +37,7 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener return _instance; } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java index 1f003649c0..baa9cb4319 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java @@ -18,11 +18,10 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ChannelFlowOkBody; @@ -42,9 +41,9 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod(); - _logger.debug("Received Channel.Flow-Ok message, active = " + method.active); + _logger.debug("Received Channel.Flow-Ok message, active = " + method.getActive()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java index 9c8e9188ec..57eaaff531 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionClosedException; @@ -31,6 +31,7 @@ import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionCloseOkBodyImpl; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -45,26 +46,31 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener return _handler; } - private ConnectionCloseMethodHandler() + protected ConnectionCloseMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { _logger.info("ConnectionClose frame received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod(); // does it matter //stateManager.changeState(AMQState.CONNECTION_CLOSING); - AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); - AMQShortString reason = method.replyText; + AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); + AMQShortString reason = method.getReplyText(); try { - // TODO: check whether channel id of zero is appropriate - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor())); + + + + ConnectionCloseOkBody closeOkBody = createConnectionCloseOkBody(); + protocolSession.getOutputHandler().sendCommand(0, closeOkBody); + + if (errorCode != AMQConstant.REPLY_SUCCESS) { @@ -97,4 +103,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_CLOSED); } } + + protected ConnectionCloseOkBody createConnectionCloseOkBody() + { + return new ConnectionCloseOkBodyImpl(); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java index 2e0f273c32..f4327ac748 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java @@ -18,10 +18,9 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; @@ -40,7 +39,7 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { stateManager.changeState(AMQState.CONNECTION_OPEN); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java index 866f65b384..8291b62596 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -45,12 +45,13 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { _logger.info("ConnectionRedirect frame received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod(); - String host = method.host.toString(); + String host = method.getHost().toString(); // the host is in the form hostname:port with the port being optional int portIndex = host.indexOf(':'); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java index ab6acffeaf..f5d1840f45 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; @@ -27,9 +27,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionSecureMethodHandler implements StateAwareMethodListener @@ -41,8 +41,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener return _instance; } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); SaslClient client = protocolSession.getSaslClient(); if (client == null) { @@ -54,14 +55,10 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener try { // Evaluate server challenge - byte[] response = client.evaluateChallenge(body.challenge); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), - body.getMajor(), body.getMinor(), - response); // response - protocolSession.writeFrame(responseFrame); + byte[] response = client.evaluateChallenge(body.getChallenge()); + + ConnectionSecureOkBody secureOkBody = createConnectionSecureOkBody(response); + protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),secureOkBody); } catch (SaslException e) { @@ -70,4 +67,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener } + + protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response) + { + return new ConnectionSecureOkBodyImpl(response); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java index 2aa2c1872b..10473b9751 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import java.io.UnsupportedEncodingException; import java.util.HashSet; @@ -44,7 +44,8 @@ import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.amqp_8_0.ConnectionStartOkBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionStartMethodHandler implements StateAwareMethodListener @@ -61,55 +62,49 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener private ConnectionStartMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, " + "AMQMethodEvent evt): called"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); ConnectionStartBody body = (ConnectionStartBody) evt.getMethod(); - byte major = (byte) body.versionMajor; - byte minor = (byte) body.versionMinor; - boolean versionOk = false; + ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(),(byte) body.getVersionMinor()); + // For the purposes of interop, we can make the client accept the broker's version string. // If it does, it then internally records the version as being the latest one that it understands. // It needs to do this since frame lookup is done by version. - if (Boolean.getBoolean("qpid.accept.broker.version")) + if (Boolean.getBoolean("qpid.accept.broker.version") && !pv.isSupported()) { - versionOk = true; - int lastIndex = ProtocolVersionList.pv.length - 1; - major = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MAJOR]; - minor = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MINOR]; - } - else - { - versionOk = checkVersionOK(major, minor); + + pv = ProtocolVersion.getLatestSupportedVersion(); } - if (versionOk) + if (pv.isSupported()) { - protocolSession.setProtocolVersion(major, minor); + protocolSession.setProtocolVersion(pv); try { // Used to hold the SASL mechanism to authenticate with. String mechanism; - if (body.mechanisms == null) + if (body.getMechanisms() == null) { throw new AMQException("mechanism not specified in ConnectionStart method frame"); } else { - mechanism = chooseMechanism(body.mechanisms); + mechanism = chooseMechanism(body.getMechanisms()); _log.debug("mechanism = " + mechanism); } if (mechanism == null) { - throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms)); + throw new AMQException("No supported security mechanism found, passed: " + new String(body.getMechanisms())); } byte[] saslResponse; @@ -135,12 +130,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener throw new AMQException("Unable to create SASL client: " + e, e); } - if (body.locales == null) + if (body.getLocales() == null) { throw new AMQException("Locales is not defined in Connection Start method"); } - final String locales = new String(body.locales, "utf8"); + final String locales = new String(body.getLocales(), "utf8"); final StringTokenizer tokenizer = new StringTokenizer(locales, " "); String selectedLocale = null; if (tokenizer.hasMoreTokens()) @@ -155,24 +150,19 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), + clientProperties.setString(ClientProperties.instance.getName(), protocolSession.getClientID()); - clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), + clientProperties.setString(ClientProperties.product.getName(), QpidProperties.getProductName()); - clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), + clientProperties.setString(ClientProperties.version.getName(), QpidProperties.getReleaseVersion()); - clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo()); - - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - clientProperties, // clientProperties + clientProperties.setString(ClientProperties.platform.getName(), getFullSystemInfo()); + + ConnectionStartOkBody startOkBody = createConnectionStartOkBody(clientProperties, // clientProperties new AMQShortString(selectedLocale), // locale new AMQShortString(mechanism), // mechanism - saslResponse)); // response + saslResponse); // response + protocolSession.getOutputHandler().sendCommand(0, startOkBody); } catch (UnsupportedEncodingException e) @@ -182,27 +172,19 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener } else { - _log.error("Broker requested Protocol [" + body.versionMajor + "-" + body.versionMinor + _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor() + "] which is not supported by this version of the client library"); protocolSession.closeProtocolSession(); } } - private boolean checkVersionOK(byte versionMajor, byte versionMinor) + private ConnectionStartOkBody createConnectionStartOkBody(FieldTable clientProperties, AMQShortString locale, AMQShortString mechanism, byte[] saslResponse) { - byte[][] supportedVersions = ProtocolVersionList.pv; - boolean supported = false; - int i = supportedVersions.length; - while ((i-- != 0) && !supported) - { - supported = (supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor) - && (supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor); - } - - return supported; + return new ConnectionStartOkBodyImpl(clientProperties,mechanism,saslResponse,locale); } + private String getFullSystemInfo() { StringBuffer fullSystemInfo = new StringBuffer(); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java index 67f1a6519f..3eb3401e37 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -27,11 +27,12 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionOpenBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ConnectionTuneOkBodyImpl; import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionTuneMethodHandler implements StateAwareMethodListener @@ -49,9 +50,10 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { _logger.debug("ConnectionTune frame received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod(); ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters(); @@ -60,36 +62,36 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener params = new ConnectionTuneParameters(); } - params.setFrameMax(frame.frameMax); - params.setChannelMax(frame.channelMax); - params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat)); + params.setFrameMax(frame.getFrameMax()); + params.setChannelMax(frame.getChannelMax()); + params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat())); protocolSession.setConnectionTuneParameters(params); stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); - protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params,frame.getMajor(), frame.getMinor())); + protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), + createTuneOkBody(params)); String host = protocolSession.getAMQConnection().getVirtualHost(); AMQShortString virtualHost = new AMQShortString("/" + host); - protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true,frame.getMajor(), frame.getMinor())); + protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), + createConnectionOpenBody( virtualHost, null, true)); } - protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor) + protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist) { - // Be aware of possible changes to parameter order as versions change. - return ConnectionOpenBody.createAMQFrame(channel, - major, minor, // AMQP version (major, minor) + + return new ConnectionOpenBodyImpl(path,// virtualHost capabilities, // capabilities - insist, // insist - path); // virtualHost + insist); // insist + } - protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor) + protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params) { // Be aware of possible changes to parameter order as versions change. - return ConnectionTuneOkBody.createAMQFrame(channel, - major, minor, + return new ConnectionTuneOkBodyImpl( params.getChannelMax(), // channelMax params.getFrameMax(), // frameMax params.getHeartbeat()); // heartbeat diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java index 146c705c00..0752b38aaf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java @@ -15,11 +15,10 @@ * limitations under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ExchangeBoundOkBody; @@ -42,13 +41,13 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { if (_logger.isDebugEnabled()) { ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod(); - _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " + - body.replyText); + _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: " + + body.getReplyText()); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java index eaf4721445..50bf03fe76 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java @@ -15,11 +15,10 @@ * limitations under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.client.handler.amqp_8_0; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.QueueDeleteOkBody; @@ -42,12 +41,12 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { if (_logger.isDebugEnabled()) { QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod(); - _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount); + _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount()); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 36dd4d400c..66524edce3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -121,12 +121,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSMessageID() throws JMSException { - if (getContentHeaderProperties().getMessageId() == null) + if (getContentHeaderProperties().getMessageIdAsString() == null) { getContentHeaderProperties().setMessageId("ID:" + _deliveryTag); } - return getContentHeaderProperties().getMessageId(); + return getContentHeaderProperties().getMessageIdAsString(); } public void setJMSMessageID(String messageId) throws JMSException @@ -146,7 +146,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte[] getJMSCorrelationIDAsBytes() throws JMSException { - return getContentHeaderProperties().getCorrelationId().getBytes(); + return getContentHeaderProperties().getCorrelationIdAsString().getBytes(); } public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException @@ -161,12 +161,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSCorrelationID() throws JMSException { - return getContentHeaderProperties().getCorrelationId(); + return getContentHeaderProperties().getCorrelationIdAsString(); } public Destination getJMSReplyTo() throws JMSException { - String replyToEncoding = getContentHeaderProperties().getReplyTo(); + String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); if (replyToEncoding == null) { return null; @@ -250,7 +250,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSType() throws JMSException { - return getContentHeaderProperties().getType(); + return getContentHeaderProperties().getTypeAsString(); } public void setJMSType(String string) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index c05667902f..763af312f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -116,7 +116,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text _data.limit(text.length()) ; //_data.sweep(); _data.setAutoExpand(true); - final String encoding = getContentHeaderProperties().getEncoding(); + final String encoding = getContentHeaderProperties().getEncodingAsString(); if (encoding == null) { _data.put(text.getBytes()); @@ -155,11 +155,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { return null; } - if (getContentHeaderProperties().getEncoding() != null) + if (getContentHeaderProperties().getEncodingAsString() != null) { try { - _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncoding()).newDecoder()); + _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder()); } catch (CharacterCodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index e02771d8f5..c2015f9e7c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -92,14 +92,14 @@ public class MessageFactoryRegistry // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over // AMQP. When the type is null, it can only be assumed that the message is a byte message. - AMQShortString contentTypeShortString = properties.getContentTypeShortString(); + AMQShortString contentTypeShortString = properties.getContentType(); contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) : contentTypeShortString; MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); if (mf == null) { - throw new AMQException("Unsupport MIME type of " + properties.getContentType()); + throw new AMQException("Unsupport MIME type of " + properties.getContentTypeAsString()); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d0cc52271a..93c0cb5c12 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -1,630 +1,36 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.client.protocol; -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - -import org.apache.log4j.Logger; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.qpid.AMQConnectionClosedException; -import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.failover.FailoverHandler; -import org.apache.qpid.client.failover.FailoverState; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.ssl.SSLContextFactory; - +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.state.AMQStateManager; -public class AMQProtocolHandler extends IoHandlerAdapter +/** + * Created by IntelliJ IDEA. + * User: U146758 + * Date: 07-Mar-2007 + * Time: 19:40:08 + * To change this template use File | Settings | File Templates. + */ +public interface AMQProtocolHandler { - private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); - - /** - * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances - * and protocol handler instances. - */ - private AMQConnection _connection; - - /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ - private volatile AMQProtocolSession _protocolSession; - - private AMQStateManager _stateManager = new AMQStateManager(); - - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); - - /** - * We create the failover handler when the session is created since it needs a reference to the IoSession in order - * to be able to send errors during failover back to the client application. The session won't be available in the - * case where we failing over due to a Connection.Redirect message from the broker. - */ - private FailoverHandler _failoverHandler; - - /** - * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly - * attempting failover where it is failing. - */ - private FailoverState _failoverState = FailoverState.NOT_STARTED; - - private CountDownLatch _failoverLatch; - - private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; - - public AMQProtocolHandler(AMQConnection con) - { - _connection = con; - } - - public void sessionCreated(IoSession session) throws Exception - { - _logger.debug("Protocol session created for session " + System.identityHashCode(session)); - _failoverHandler = new FailoverHandler(this, session); - - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); - - if (Boolean.getBoolean("amqj.shared_read_write_pool")) - { - session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } - else - { - session.getFilterChain().addLast("protocolFilter", pcf); - } - // we only add the SSL filter where we have an SSL connection - if (_connection.getSSLConfiguration() != null) - { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); - SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); - sslFilter.setUseClientMode(true); - session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); - } - - - try - { - - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - threadModel.getAsynchronousReadFilter().createNewJobForSession(session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); - } - catch (RuntimeException e) - { - e.printStackTrace(); - } - - _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); - _protocolSession.init(); - } - - public void sessionOpened(IoSession session) throws Exception - { - //System.setProperty("foo", "bar"); - } - - /** - * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by - * sessionClosed() depending on whether we were trying to send data at the time of failure. - * - * @param session - * - * @throws Exception - */ - public void sessionClosed(IoSession session) throws Exception - { - if (_connection.isClosed()) - { - _logger.info("Session closed called by client"); - } - else - { - _logger.info("Session closed called with failover state currently " + _failoverState); - - //reconnetablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. - - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) - { - _logger.info("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - startFailoverThread(); - } - else - { - _logger.info("Not starting failover as state currently " + _failoverState); - } - } - else - { - _logger.info("Failover not allowed by policy."); - - if (_logger.isDebugEnabled()) - { - _logger.debug(_connection.getFailoverPolicy().toString()); - } - - if (_failoverState != FailoverState.IN_PROGRESS) - { - _logger.info("sessionClose() not allowed to failover"); - _connection.exceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.")); - } - else - { - _logger.info("sessionClose() failover in progress"); - } - } - } - - _logger.info("Protocol Session [" + this + "] closed"); - } - - /** See {@link FailoverHandler} to see rationale for separate thread. */ - private void startFailoverThread() - { - Thread failoverThread = new Thread(_failoverHandler); - failoverThread.setName("Failover"); - // Do not inherit daemon-ness from current thread as this can be a daemon - // thread such as a AnonymousIoService thread. - failoverThread.setDaemon(false); - failoverThread.start(); - } - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - //write heartbeat frame: - _logger.debug("Sent heartbeat"); - session.write(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - //failover: - HeartbeatDiagnostics.timeout(); - _logger.warn("Timed out while waiting for heartbeat from peer."); - session.close(); - } - } - - public void exceptionCaught(IoSession session, Throwable cause) throws Exception - { - if (_failoverState == FailoverState.NOT_STARTED) - { - //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) - if (cause instanceof AMQConnectionClosedException) - { - _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attemp failover - - sessionClosed(session); - } - } - // we reach this point if failover was attempted and failed therefore we need to let the calling app - // know since we cannot recover the situation - else if (_failoverState == FailoverState.FAILED) - { - _logger.error("Exception caught by protocol handler: " + cause, cause); - // we notify the state manager of the error in case we have any clients waiting on a state - // change. Those "waiters" will be interrupted and can handle the exception - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); - _connection.exceptionReceived(cause); - } - } - - /** - * There are two cases where we have other threads potentially blocking for events to be handled by this class. - * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type - * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. - * - * @param e the exception to propagate - */ - public void propagateExceptionToWaiters(Exception e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) - { - final Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener ml = (AMQMethodListener) it.next(); - ml.error(e); - } - } - } - - private static int _messageReceivedCount; - - public void messageReceived(IoSession session, Object message) throws Exception - { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; - - if (debug && (msgNumber % 1000 == 0)) - { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } - - AMQFrame frame = (AMQFrame) message; - - final AMQBody bodyFrame = frame.getBodyFrame(); - - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - - switch (bodyFrame.getFrameType()) - { - case AMQMethodBody.TYPE: - - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } - - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - - try - { - - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); - } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - } - exceptionCaught(session, e); - } - break; - - case ContentHeaderBody.TYPE: - - _protocolSession.messageContentHeaderReceived(frame.getChannel(), - (ContentHeaderBody) bodyFrame); - break; - - case ContentBody.TYPE: - - _protocolSession.messageContentBodyReceived(frame.getChannel(), - (ContentBody) bodyFrame); - break; - - case HeartbeatBody.TYPE: - - if (debug) - { - _logger.debug("Received heartbeat"); - } - break; - - default: - - } - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); - } - - private static int _messagesOut; - - public void messageSent(IoSession session, Object message) throws Exception - { - final long sentMessages = _messagesOut++; - - final boolean debug = _logger.isDebugEnabled(); - - if (debug && (sentMessages % 1000 == 0)) - { - _logger.debug("Sent " + _messagesOut + " protocol messages"); - } - _connection.bytesSent(session.getWrittenBytes()); - if (debug) - { - _logger.debug("Sent frame " + message); - } - } - - /* - public void addFrameListener(AMQMethodListener listener) - { - _frameListeners.add(listener); - } - - public void removeFrameListener(AMQMethodListener listener) - { - _frameListeners.remove(listener); - } - */ - public void attainState(AMQState s) throws AMQException - { - getStateManager().attainState(s); - } - - /** - * Convenience method that writes a frame to the protocol session. Equivalent to calling - * getProtocolSession().write(). - * - * @param frame the frame to write - */ - public void writeFrame(AMQDataBlock frame) - { - _protocolSession.writeFrame(frame); - } - - public void writeFrame(AMQDataBlock frame, boolean wait) - { - _protocolSession.writeFrame(frame, wait); - } - - /** - * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to - * calling getProtocolSession().write() then waiting for the response. - * - * @param frame - * @param listener the blocking listener. Note the calling thread will block. - */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) - throws AMQException - { - return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); - } - - /** - * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to - * calling getProtocolSession().write() then waiting for the response. - * - * @param frame - * @param listener the blocking listener. Note the calling thread will block. - */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) - throws AMQException - { - try - { - _frameListeners.add(listener); - _protocolSession.writeFrame(frame); - - AMQMethodEvent e = listener.blockForFrame(timeout); - return e; - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener - } - catch (AMQException e) - { - throw e; - } - finally - { - // If we don't removeKey the listener then no-one will - _frameListeners.remove(listener); - } - - } - - /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException - { - return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); - } - - /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException - { - return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout); - } - - /** - * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol - * handler will ensure that messages are delivered to the consumer(s) on that session. - * - * @param channelId the channel id of the session - * @param session the session instance. - */ - public void addSessionByChannel(int channelId, AMQSession session) - { - _protocolSession.addSessionByChannel(channelId, session); - } - - /** - * Convenience method to deregister an AMQSession with the protocol handler. - * - * @param channelId then channel id of the session - */ - public void removeSessionByChannel(int channelId) - { - _protocolSession.removeSessionByChannel(channelId); - } - - public void closeSession(AMQSession session) throws AMQException - { - _protocolSession.closeSession(session); - } - - public void closeConnection() throws AMQException - { - closeConnection(-1); - } - - public void closeConnection(long timeout) throws AMQException - { - getStateManager().changeState(AMQState.CONNECTION_CLOSING); - - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, - _protocolSession.getProtocolMajorVersion(), - _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection.")); // replyText - - try - { - syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _protocolSession.closeProtocolSession(); - } - catch (AMQTimeoutException e) - { - _protocolSession.closeProtocolSession(false); - } - - - } - - /** @return the number of bytes read from this protocol session */ - public long getReadBytes() - { - return _protocolSession.getIoSession().getReadBytes(); - } - - /** @return the number of bytes written to this protocol session */ - public long getWrittenBytes() - { - return _protocolSession.getIoSession().getWrittenBytes(); - } - - public void failover(String host, int port) - { - _failoverHandler.setHost(host); - _failoverHandler.setPort(port); - // see javadoc for FailoverHandler to see rationale for separate thread - startFailoverThread(); - } - - public void blockUntilNotFailingOver() throws InterruptedException - { - if (_failoverLatch != null) - { - _failoverLatch.await(); - } - } - - public AMQShortString generateQueueName() - { - return _protocolSession.generateQueueName(); - } - - public CountDownLatch getFailoverLatch() - { - return _failoverLatch; - } - - public void setFailoverLatch(CountDownLatch failoverLatch) - { - _failoverLatch = failoverLatch; - } - - public AMQConnection getConnection() - { - return _connection; - } - - public AMQStateManager getStateManager() - { - return _stateManager; - } + void writeFrame(AMQDataBlock frame); - public void setStateManager(AMQStateManager stateManager) - { - _stateManager = stateManager; - _protocolSession.setStateManager(stateManager); - } + void closeSession(AMQSession session) throws AMQException; - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } + void closeConnection() throws AMQException; - FailoverState getFailoverState() - { - return _failoverState; - } + AMQConnection getConnection(); - public void setFailoverState(FailoverState failoverState) - { - _failoverState = failoverState; - } + AMQStateManager getStateManager(); - public byte getProtocolMajorVersion() - { - return _protocolSession.getProtocolMajorVersion(); - } + AMQProtocolSession getProtocolSession(); + ProtocolOutputHandler getOutputHandler(); - public byte getProtocolMinorVersion() - { - return _protocolSession.getProtocolMinorVersion(); - } + ProtocolVersion getProtocolVersion(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java new file mode 100644 index 0000000000..738531d5a5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java @@ -0,0 +1,537 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.protocol; + +import java.util.Iterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.SSLConfiguration; +import org.apache.qpid.client.failover.FailoverHandler; +import org.apache.qpid.client.failover.FailoverState; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.*; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.ssl.SSLContextFactory; + + +public class AMQProtocolHandlerImpl extends IoHandlerAdapter implements AMQProtocolHandler +{ + private static final Logger _logger = Logger.getLogger(AMQProtocolHandlerImpl.class); + + /** + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances + * and protocol handler instances. + */ + private AMQConnection _connection; + + /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ + private volatile AMQProtocolSession _protocolSession; + + private AMQStateManager _stateManager = new AMQStateManager(); + + + + /** + * We create the failover handler when the session is created since it needs a reference to the IoSession in order + * to be able to send errors during failover back to the client application. The session won't be available in the + * case where we failing over due to a Connection.Redirect message from the broker. + */ + private FailoverHandler _failoverHandler; + + /** + * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly + * attempting failover where it is failing. + */ + private FailoverState _failoverState = FailoverState.NOT_STARTED; + + private CountDownLatch _failoverLatch; + + private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + private static final int CONTROL_CHANNEL = 0; + + public AMQProtocolHandlerImpl(AMQConnection con) + { + _connection = con; + } + + public void sessionCreated(IoSession session) throws Exception + { + _logger.debug("Protocol session created for session " + System.identityHashCode(session)); + _failoverHandler = new FailoverHandler(this, session); + + final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); + + if (Boolean.getBoolean("amqj.shared_read_write_pool")) + { + session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); + } + else + { + session.getFilterChain().addLast("protocolFilter", pcf); + } + // we only add the SSL filter where we have an SSL connection + if (_connection.getSSLConfiguration() != null) + { + SSLConfiguration sslConfig = _connection.getSSLConfiguration(); + SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); + sslFilter.setUseClientMode(true); + session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); + } + + + + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + threadModel.getAsynchronousReadFilter().createNewJobForSession(session); + threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); + + + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); + + // This starts the AMQP initiation by sending the AMQP Header + _protocolSession.init(); + } + + public void sessionOpened(IoSession session) throws Exception + { + + } + + /** + * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by + * sessionClosed() depending on whether we were trying to send data at the time of failure. + * + * @param session + * + * @throws Exception + */ + public void sessionClosed(IoSession session) throws Exception + { + if (_connection.isClosed()) + { + _logger.info("Session closed called by client"); + } + else + { + _logger.info("Session closed called with failover state currently " + _failoverState); + + //reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + + if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) + { + _logger.info("FAILOVER STARTING"); + if (_failoverState == FailoverState.NOT_STARTED) + { + _failoverState = FailoverState.IN_PROGRESS; + startFailoverThread(); + } + else + { + _logger.info("Not starting failover as state currently " + _failoverState); + } + } + else + { + _logger.info("Failover not allowed by policy."); + + if (_logger.isDebugEnabled()) + { + _logger.debug(_connection.getFailoverPolicy().toString()); + } + + if (_failoverState != FailoverState.IN_PROGRESS) + { + _logger.info("sessionClose() not allowed to failover"); + _connection.exceptionReceived( + new AMQDisconnectedException("Server closed connection and reconnection " + + "not permitted.")); + } + else + { + _logger.info("sessionClose() failover in progress"); + } + } + } + + _logger.info("Protocol Session [" + this + "] closed"); + } + + /** See {@link FailoverHandler} to see rationale for separate thread. */ + private void startFailoverThread() + { + Thread failoverThread = new Thread(_failoverHandler); + failoverThread.setName("Failover"); + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.setDaemon(false); + failoverThread.start(); + } + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); + if (IdleStatus.WRITER_IDLE.equals(status)) + { + //write heartbeat frame: + _logger.debug("Sent heartbeat"); + session.write(HeartbeatBody.FRAME); + HeartbeatDiagnostics.sent(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + //failover: + HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + session.close(); + } + } + + public void exceptionCaught(IoSession session, Throwable cause) throws Exception + { + if (_failoverState == FailoverState.NOT_STARTED) + { + //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) + if (cause instanceof AMQConnectionClosedException) + { + _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); + // this will attemp failover + + sessionClosed(session); + } + } + // we reach this point if failover was attempted and failed therefore we need to let the calling app + // know since we cannot recover the situation + else if (_failoverState == FailoverState.FAILED) + { + _logger.error("Exception caught by protocol handler: " + cause, cause); + // we notify the state manager of the error in case we have any clients waiting on a state + // change. Those "waiters" will be interrupted and can handle the exception + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + propagateExceptionToWaiters(amqe); + _connection.exceptionReceived(cause); + } + } + + /** + * There are two cases where we have other threads potentially blocking for events to be handled by this class. + * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type + * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. + * + * @param e the exception to propagate + */ + public void propagateExceptionToWaiters(Exception e) + { + getStateManager().error(e); + getProtocolSession().getOutputHandler().error(e); + + } + + private static int _messageReceivedCount; + + public void messageReceived(IoSession session, Object message) throws Exception + { + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; + + if (debug && (msgNumber % 1000 == CONTROL_CHANNEL)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } + + AMQFrame frame = (AMQFrame) message; + + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + + switch (bodyFrame.getFrameType()) + { + case AMQMethodBodyImpl.TYPE: + + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } + + final AMQMethodEvent<? extends AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); + + try + { + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + wasAnyoneInterested = getProtocolSession().getOutputHandler().methodReceived(evt) || wasAnyoneInterested; + + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + } + } + catch (AMQException e) + { + getStateManager().error(e); + getProtocolSession().getOutputHandler().error(e); + exceptionCaught(session, e); + } + break; + + case ContentHeaderBody.TYPE: + + _protocolSession.messageContentHeaderReceived(frame.getChannel(), + (ContentHeaderBody) bodyFrame); + break; + + case ContentBody.TYPE: + + _protocolSession.messageContentBodyReceived(frame.getChannel(), + (ContentBody) bodyFrame); + break; + + case HeartbeatBody.TYPE: + + if (debug) + { + _logger.debug("Received heartbeat"); + } + break; + + default: + + } + _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + } + + private static int _messagesOut; + + public void messageSent(IoSession session, Object message) throws Exception + { + final long sentMessages = _messagesOut++; + + final boolean debug = _logger.isDebugEnabled(); + + if (debug && (sentMessages % 1000 == CONTROL_CHANNEL)) + { + _logger.debug("Sent " + _messagesOut + " protocol messages"); + } + _connection.bytesSent(session.getWrittenBytes()); + if (debug) + { + _logger.debug("Sent frame " + message); + } + } + + /* + public void addFrameListener(AMQMethodListener listener) + { + _frameListeners.add(listener); + } + + public void removeFrameListener(AMQMethodListener listener) + { + _frameListeners.remove(listener); + } + */ + public void attainState(AMQState s) throws AMQException + { + getStateManager().attainState(s); + } + + /** + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). + * + * @param frame the frame to write + */ + public void writeFrame(AMQDataBlock frame) + { + _protocolSession.writeFrame(frame); + } + + + + /** + * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol + * handler will ensure that messages are delivered to the consumer(s) on that session. + * + * @param channelId the channel id of the session + * @param session the session instance. + */ + public void addSessionByChannel(int channelId, AMQSession session) + { + _protocolSession.addSessionByChannel(channelId, session); + } + + /** + * Convenience method to deregister an AMQSession with the protocol handler. + * + * @param channelId then channel id of the session + */ + public void removeSessionByChannel(int channelId) + { + _protocolSession.removeSessionByChannel(channelId); + } + + public void closeSession(AMQSession session) throws AMQException + { + _protocolSession.closeSession(session); + } + + public void closeConnection() throws AMQException + { + closeConnection(-1); + } + + public void closeConnection(long timeout) throws AMQException + { + getStateManager().changeState(AMQState.CONNECTION_CLOSING); + + + try + { + ConnectionCloseBody closeBody = getAMQMethodFactory().createConnectionClose(); + sendCommandReceiveResponse(CONTROL_CHANNEL,closeBody); + + + _protocolSession.closeProtocolSession(); + } + catch (AMQTimeoutException e) + { + _protocolSession.closeProtocolSession(false); + } + + + } + + private void sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException + { + getOutputHandler().sendCommandReceiveResponse(channelId, command); + } + + private AMQMethodFactory getAMQMethodFactory() + { + return getOutputHandler().getAMQMethodFactory(); + } + + /** @return the number of bytes read from this protocol session */ + public long getReadBytes() + { + return _protocolSession.getIoSession().getReadBytes(); + } + + /** @return the number of bytes written to this protocol session */ + public long getWrittenBytes() + { + return _protocolSession.getIoSession().getWrittenBytes(); + } + + public void failover(String host, int port) + { + _failoverHandler.setHost(host); + _failoverHandler.setPort(port); + // see javadoc for FailoverHandler to see rationale for separate thread + startFailoverThread(); + } + + public void blockUntilNotFailingOver() throws InterruptedException + { + if (_failoverLatch != null) + { + _failoverLatch.await(); + } + } + + public AMQShortString generateQueueName() + { + return _protocolSession.generateQueueName(); + } + + public CountDownLatch getFailoverLatch() + { + return _failoverLatch; + } + + public void setFailoverLatch(CountDownLatch failoverLatch) + { + _failoverLatch = failoverLatch; + } + + public AMQConnection getConnection() + { + return _connection; + } + + public AMQStateManager getStateManager() + { + return _stateManager; + } + + public void setStateManager(AMQStateManager stateManager) + { + _stateManager = stateManager; + _protocolSession.setStateManager(stateManager); + } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public ProtocolOutputHandler getOutputHandler() + { + return getProtocolSession().getOutputHandler(); + } + + FailoverState getFailoverState() + { + return _failoverState; + } + + public void setFailoverState(FailoverState failoverState) + { + _failoverState = failoverState; + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolSession.getProtocolVersion(); + } + + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 055109d3be..3c158415e0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -43,10 +43,11 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MainRegistry; import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.MainRegistry; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.protocol.AMQConstant; @@ -56,7 +57,7 @@ import org.apache.qpid.protocol.AMQConstant; * The underlying protocol session is still available but clients should not * use it to obtain session attributes. */ -public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession +public class AMQProtocolSession implements AMQVersionAwareProtocolSession { protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; @@ -81,7 +82,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP * The handler from which this session was created and which is used to handle protocol events. * We send failover events to the handler. */ - protected final AMQProtocolHandler _protocolHandler; + protected final AMQProtocolHandlerImpl _protocolHandler; /** * Maps from the channel id to the AMQSession that it represents. @@ -102,9 +103,10 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP protected int _queueId = 1; protected final Object _queueIdLock = new Object(); - private byte _protocolMinorVersion; - private byte _protocolMajorVersion; - private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]); + private MethodRegistry _registry = MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + private ProtocolVersion _protocolVersion; + private ProtocolOutputHandler _outputHandler = ProtocolOutputHandlerFactory.createOutputHandler(ProtocolVersion.getLatestSupportedVersion(), this); + ; /** @@ -118,7 +120,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP _stateManager = new AMQStateManager(this); } - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) + public AMQProtocolSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection) { _protocolHandler = protocolHandler; _minaProtocolSession = protocolSession; @@ -129,7 +131,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP _stateManager = new AMQStateManager(this); } - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager) + public AMQProtocolSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager) { _protocolHandler = protocolHandler; _minaProtocolSession = protocolSession; @@ -147,11 +149,8 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP { // start the process of setting up the connection. This is the first place that // data is written to the server. - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + + _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); } public String getClientID() @@ -474,26 +473,27 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP session.confirmConsumerCancelled(consumerTag); } - public void setProtocolVersion(final byte versionMajor, final byte versionMinor) + public void setProtocolVersion(ProtocolVersion pv) { - _protocolMajorVersion = versionMajor; - _protocolMinorVersion = versionMinor; - _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); + _protocolVersion = pv; + + _registry = MethodRegistry.getMethodRegistry(pv); } - public byte getProtocolMinorVersion() + public ProtocolVersion getProtocolVersion() { - return _protocolMinorVersion; + return _protocolVersion; } - public byte getProtocolMajorVersion() + public MethodRegistry getRegistry() { - return _protocolMajorVersion; + return _registry; } - public VersionSpecificRegistry getRegistry() + public ProtocolOutputHandler getOutputHandler() { - return _registry; + return _outputHandler; } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 85f98eab69..8f1f410a15 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -23,11 +23,12 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -public abstract class BlockingMethodFrameListener implements AMQMethodListener +public abstract class BlockingMethodFrameListener<T extends AMQMethodBody> implements AMQMethodListener { private volatile boolean _ready = false; @@ -43,7 +44,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener protected int _channelId; - protected AMQMethodEvent _doneEvt = null; + protected AMQMethodEvent<T> _doneEvt = null; public BlockingMethodFrameListener(int channelId) { @@ -91,7 +92,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener /** * This method is called by the thread that wants to wait for a frame. */ - public AMQMethodEvent blockForFrame(long timeout) throws AMQException + public AMQMethodEvent<T> blockForFrame(long timeout) throws AMQException { synchronized (_lock) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java new file mode 100644 index 0000000000..3db6232d41 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java @@ -0,0 +1,58 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Map;
+import java.util.HashMap;
+
+
+public interface ProtocolOutputHandler
+{
+
+ void sendCommand(int channelId, AMQMethodBody command);
+
+ AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException;
+ AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command, long timeout) throws AMQException;
+ <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass, long timeout) throws AMQException;
+ <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass) throws AMQException;
+
+ AMQMethodFactory getAMQMethodFactory();
+
+ void publishMessage(int channelId, AMQShortString exchangeName, AMQShortString routingKey, boolean immediate, boolean mandatory, ByteBuffer payload, CommonContentHeaderProperties contentHeaderProperties, int ticket);
+
+ <M extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<M> evt) throws Exception;
+
+ void error(Exception e);
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java new file mode 100644 index 0000000000..6cc0d8d3b5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java @@ -0,0 +1,50 @@ +package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.client.protocol.amqp_8_0.ProtocolOutputHandler_8_0;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public abstract class ProtocolOutputHandlerFactory
+{
+ private static final Map<ProtocolVersion, ProtocolOutputHandlerFactory> _handlers =
+ new HashMap<ProtocolVersion, ProtocolOutputHandlerFactory>();
+
+ public ProtocolOutputHandlerFactory(ProtocolVersion pv)
+ {
+ _handlers.put(pv,this);
+ }
+
+ public abstract ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession);
+
+ public static ProtocolOutputHandler createOutputHandler(ProtocolVersion version, AMQProtocolSession amqProtocolSession)
+ {
+ return _handlers.get(version).newInstance(amqProtocolSession);
+ }
+
+ private static final ProtocolOutputHandlerFactory VERSION_8_0 =
+ new ProtocolOutputHandlerFactory(new ProtocolVersion((byte)8,(byte)0))
+ {
+
+ public ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession)
+ {
+ return new ProtocolOutputHandler_8_0(amqProtocolSession);
+ }
+ };
+
+ // TODO - HACK
+
+ private static final ProtocolOutputHandlerFactory VERSION_0_9 =
+ new ProtocolOutputHandlerFactory(new ProtocolVersion((byte)0,(byte)9))
+ {
+
+ public ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession)
+ {
+ return new ProtocolOutputHandler_8_0(amqProtocolSession);
+ }
+ };
+
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java b/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java new file mode 100644 index 0000000000..3654f46c1a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java @@ -0,0 +1,278 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.protocol.amqp_8_0;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.*;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class ProtocolOutputHandler_8_0 implements ProtocolOutputHandler
+{
+ private static final AMQMethodFactory METHOD_FACTORY = new AMQMethodFactory_8_0() ;
+
+
+ private static final Map<Class<? extends AMQMethodBody>, Class<? extends AMQMethodBody>> REQUSET_RESPONSE_METHODBODY_MAP =
+ new HashMap<Class<? extends AMQMethodBody>, Class<? extends AMQMethodBody>>();
+
+ static
+ {
+ // Basic Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicCancelBody.class, BasicCancelOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicConsumeBody.class, BasicConsumeOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicQosBody.class, BasicQosOkBody.class);
+ // GET ???
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicRecoverBody.class, BasicRecoverOkBody.class);
+
+ // Channel Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelCloseBody.class, ChannelCloseOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelFlowBody.class, ChannelFlowOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelOpenBody.class, ChannelOpenOkBody.class);
+
+ // Connection Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionOpenBody.class, ConnectionOpenOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionSecureBody.class, ConnectionSecureOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionStartBody.class, ConnectionStartOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionTuneBody.class, ConnectionTuneOkBody.class);
+
+ // Exchange Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeBoundBody.class, ExchangeBoundOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeDeclareBody.class, ExchangeDeclareOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeDeleteBody.class, ExchangeDeleteOkBody.class);
+
+ // Queue Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueBindBody.class, QueueBindOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueDeclareBody.class, QueueDeclareOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueDeleteBody.class, QueueDeleteOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueuePurgeBody.class, QueuePurgeOkBody.class);
+
+ // Tx Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxCommitBody.class, TxCommitOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxRollbackBody.class, TxRollbackOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxSelectBody.class, TxSelectOkBody.class);
+
+ }
+
+
+
+
+
+ private final AMQProtocolSession _session;
+ private static final long DEFAULT_TIMEOUT = 30000;
+ private final CopyOnWriteArraySet<SpecificMethodFrameListener> _frameListeners =
+ new CopyOnWriteArraySet<SpecificMethodFrameListener>();
+
+ public ProtocolOutputHandler_8_0(AMQProtocolSession amqProtocolSession)
+ {
+ _session = amqProtocolSession;
+ }
+
+
+
+
+ private void writeFrame(AMQDataBlock frame)
+ {
+ _session.writeFrame(frame);
+ }
+
+ public void sendCommand(int channelId, AMQMethodBody command)
+ {
+ _session.writeFrame(new AMQFrame(channelId,command));
+ }
+
+ public AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, REQUSET_RESPONSE_METHODBODY_MAP.get(command.getClass()));
+ }
+
+ public AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command, long timeout) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, REQUSET_RESPONSE_METHODBODY_MAP.get(command.getClass()), timeout);
+ }
+
+ public <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass, long timeout) throws AMQException
+ {
+ AMQFrame frame = new AMQFrame(channelId,command);
+ return writeCommandFrameAndWaitForReply(frame,
+ new SpecificMethodFrameListener<T>(channelId, responseClass), timeout);
+ }
+
+ private <T extends AMQMethodBody> T writeCommandFrameAndWaitForReply(AMQFrame frame, SpecificMethodFrameListener<T> listener, long timeout) throws AMQException
+ {
+ try
+ {
+ _frameListeners.add(listener);
+ _session.writeFrame(frame);
+
+ AMQMethodEvent<T> e = listener.blockForFrame(timeout);
+ return e.getMethod();
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
+ }
+ finally
+ {
+ // If we don't removeKey the listener then no-one will
+ _frameListeners.remove(listener);
+ }
+
+
+ }
+
+ public <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, responseClass, DEFAULT_TIMEOUT);
+ }
+
+ public AMQMethodFactory getAMQMethodFactory()
+ {
+ return METHOD_FACTORY;
+ }
+
+
+ public void publishMessage(int channelId, AMQShortString exchangeName, AMQShortString routingKey, boolean immediate, boolean mandatory, ByteBuffer payload, CommonContentHeaderProperties contentHeaderProperties, int ticket)
+ {
+ final int size = (payload != null) ? payload.limit() : 0;
+ BasicPublishBodyImpl publishBody = new BasicPublishBodyImpl(ticket, exchangeName, routingKey, mandatory, immediate);
+
+
+ final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+ final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+ if (payload != null)
+ {
+ createContentBodies(payload, frames, 2, channelId);
+ }
+
+
+
+ AMQFrame contentHeaderFrame =
+ ContentHeaderBody.createAMQFrame(channelId,
+ publishBody.CLASS_ID,
+ 0, // weight
+ contentHeaderProperties,
+ size);
+
+ frames[0] = new AMQFrame(channelId,publishBody);
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ writeFrame(compositeFrame);
+ }
+
+ public <M extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<M> evt) throws AMQException
+ {
+ boolean wasAnyoneInterested = false;
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator<SpecificMethodFrameListener> it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final SpecificMethodFrameListener listener = it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+
+ return wasAnyoneInterested;
+ }
+
+ public void error(Exception e)
+ {
+ if (!_frameListeners.isEmpty())
+ {
+ final Iterator<SpecificMethodFrameListener> it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final SpecificMethodFrameListener ml = it.next();
+ ml.error(e);
+ }
+ }
+ }
+
+
+ /**
+ * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ * maximum frame size.
+ *
+ * @param payload
+ * @param frames
+ * @param offset
+ * @param channelId @return the array of content bodies
+ */
+ private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
+ {
+
+ if (frames.length == (offset + 1))
+ {
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
+ }
+ else
+ {
+
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ long remaining = payload.remaining();
+ for (int i = offset; i < frames.length; i++)
+ {
+ payload.position((int) framePayloadMax * (i - offset));
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+ payload.limit(payload.position() + length);
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
+
+ remaining -= length;
+ }
+ }
+
+ }
+
+ private int calculateContentBodyFrameCount(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ((payload == null) || (payload.remaining() == 0))
+ {
+ frameCount = 0;
+ }
+ else
+ {
+ int dataLength = payload.remaining();
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 825baf95d1..bba1c2701c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -27,34 +27,20 @@ import java.util.concurrent.CopyOnWriteArraySet; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.handler.BasicCancelOkMethodHandler; -import org.apache.qpid.client.handler.BasicDeliverMethodHandler; -import org.apache.qpid.client.handler.BasicReturnMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler; -import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler; -import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; -import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; -import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; -import org.apache.qpid.client.handler.ConnectionStartMethodHandler; -import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; -import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler; -import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.BasicCancelOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.BasicDeliverMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ChannelCloseOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ChannelFlowOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionOpenOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ExchangeBoundOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.QueueDeleteOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.BasicReturnMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.*; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -62,7 +48,7 @@ import org.apache.qpid.protocol.AMQMethodListener; * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler * there is a separate state manager. */ -public class AMQStateManager implements AMQMethodListener +public class AMQStateManager { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); private AMQProtocolSession _protocolSession; @@ -178,7 +164,7 @@ public class AMQStateManager implements AMQMethodListener StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, _protocolSession, evt); + handler.methodReceived(this, evt); return true; } return false; diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java index b3932533ce..9ddc50941b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java @@ -31,6 +31,6 @@ import org.apache.qpid.protocol.AMQMethodEvent; */ public interface StateAwareMethodListener { - void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, - AMQMethodEvent evt) throws AMQException; + void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException; + } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 1c70ded62a..8a19a77776 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -22,13 +22,14 @@ package org.apache.qpid.client.state.listener; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.BlockingMethodFrameListener; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; -public class SpecificMethodFrameListener extends BlockingMethodFrameListener +public class SpecificMethodFrameListener<T extends AMQMethodBody> extends BlockingMethodFrameListener { private final Class _expectedClass; - public SpecificMethodFrameListener(int channelId, Class expectedClass) + public SpecificMethodFrameListener(int channelId, Class<T> expectedClass) { super(channelId); _expectedClass = expectedClass; diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java index 7a24d6e15a..9877cd3c37 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java @@ -22,11 +22,11 @@ package org.apache.qpid.client.transport; import java.io.IOException; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.jms.BrokerDetails; public interface ITransportConnection { - void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) + void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail) throws IOException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 04e7e40564..25d5a2cc1c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -30,7 +30,7 @@ import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; @@ -50,7 +50,7 @@ public class SocketTransportConnection implements ITransportConnection _socketConnectorFactory = socketConnectorFactory; } - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) + public void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail) throws IOException { ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 104c4b43d0..4f8126f070 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -27,7 +27,7 @@ import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.PoolingFilter; import org.apache.qpid.pool.ReferenceCountingExecutorService; @@ -43,7 +43,7 @@ public class VmPipeTransportConnection implements ITransportConnection _port = port; } - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException + public void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new VmPipeConnector(); final IoServiceConfig cfg = ioConnector.getDefaultConfig(); |