diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-04-22 18:01:08 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-04-22 18:01:08 +0000 |
commit | 09742065d5d765f6bebf5fcd3a9aad9ef325b76d (patch) | |
tree | 3117b6bfd9bb26263cc219b68b64ac090c4c2bf2 | |
parent | ea74745601a57d06633d44235e9bf61b9a88fb55 (diff) | |
download | qpid-python-09742065d5d765f6bebf5fcd3a9aad9ef325b76d.tar.gz |
QPID-832: moved more 0-8 specific code to 0-8 subclasses
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@650598 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 37 insertions, 185 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 62dcb90a86..5bb2955399 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -153,25 +153,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - private void declareDestination(AMQDestination destination) - { - - ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(), - destination.getExchangeName(), - destination.getExchangeClass(), - false, - false, - false, - false, - true, - null); - // Declare the exchange - // Note that the durable and internal arguments are ignored since passive is set to false - - AMQFrame declare = body.generateFrame(_channelId); - - _protocolHandler.writeFrame(declare); - } + abstract void declareDestination(AMQDestination destination); public void setDisableMessageID(boolean b) throws JMSException { @@ -497,81 +479,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac type = AMQDestination.UNKNOWN_TYPE; } - // message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - - - BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), - destination.getExchangeName(), - destination.getRoutingKey(), - mandatory, - immediate); - - AMQFrame publishFrame = body.generateFrame(_channelId); - - message.prepareForSending(); - ByteBuffer payload = message.getData(); - BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); - - if (!_disableTimestamps) - { - final long currentTime = System.currentTimeMillis(); - contentHeaderProperties.setTimestamp(currentTime); - - if (timeToLive > 0) - { - contentHeaderProperties.setExpiration(currentTime + timeToLive); - } - else - { - contentHeaderProperties.setExpiration(0); - } - } - - contentHeaderProperties.setDeliveryMode((byte) deliveryMode); - contentHeaderProperties.setPriority((byte) priority); - - final int size = (payload != null) ? payload.limit() : 0; - final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); - final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; - - if (payload != null) - { - createContentBodies(payload, frames, 2, _channelId); - } - - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) - { - _logger.debug("Sending content body frames to " + destination); - } - - - // TODO: This is a hacky way of getting the AMQP class-id for the Basic class - int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz(); - - AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, - classIfForBasic, 0, contentHeaderProperties, size); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending content header frame to " + destination); - } - - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - - try - { - _session.checkFlowControl(); - } - catch (InterruptedException e) - { - JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed"); - jmsEx.setLinkedException(e); - throw jmsEx; - } - - _protocolHandler.writeFrame(compositeFrame, wait); + sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait); if (message != origMessage) { @@ -589,8 +497,9 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode, - int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait)throws JMSException; + abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, + int deliveryMode, int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait)throws JMSException; private void checkTemporaryDestination(AMQDestination destination) throws JMSException { @@ -612,60 +521,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - /** - * Create content bodies. This will split a large message into numerous bodies depending on the negotiated - * maximum frame size. - * - * @param payload - * @param frames - * @param offset - * @param channelId @return the array of content bodies - */ - private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) - { - - if (frames.length == (offset + 1)) - { - frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); - } - else - { - - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - long remaining = payload.remaining(); - for (int i = offset; i < frames.length; i++) - { - payload.position((int) framePayloadMax * (i - offset)); - int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; - payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); - - remaining -= length; - } - } - - } - - private int calculateContentBodyFrameCount(ByteBuffer payload) - { - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - int frameCount; - if ((payload == null) || (payload.remaining() == 0)) - { - frameCount = 0; - } - else - { - int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; - frameCount = (int) (dataLength / framePayloadMax) + lastFrame; - } - - return frameCount; - } - public void setMimeType(String mimeType) throws JMSException { checkNotClosed(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 2e31c43b4c..5fd6c23f68 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -55,7 +55,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer mandatory, waitUntilSent); } - public void declareDestination(AMQDestination destination) + void declareDestination(AMQDestination destination) { ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(), destination.getExchangeClass().toString(), @@ -67,9 +67,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer /** * Sends a message to a given destination */ - public void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, - int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, - boolean wait) throws JMSException + void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, + int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, + boolean wait) throws JMSException { message.prepareForSending(); if (message.get010Message() == null) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index d6ecb129c8..ff991b1a03 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -45,17 +45,18 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent); } - public void declareDestination(AMQDestination destination) + void declareDestination(AMQDestination destination) { + ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(), - destination.getExchangeName(), - destination.getExchangeClass(), - false, - false, - false, - false, - true, - null); + destination.getExchangeName(), + destination.getExchangeClass(), + false, + false, + false, + false, + true, + null); // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false @@ -64,17 +65,15 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer _protocolHandler.writeFrame(declare); } - public void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message, - int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message, + int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate, + boolean wait) throws JMSException { -// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), - destination.getExchangeName(), - destination.getRoutingKey(), - mandatory, - immediate); + destination.getExchangeName(), + destination.getRoutingKey(), + mandatory, + immediate); AMQFrame publishFrame = body.generateFrame(_channelId); @@ -114,17 +113,13 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer _logger.debug("Sending content body frames to " + destination); } + // TODO: This is a hacky way of getting the AMQP class-id for the Basic class int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz(); - // weight argument of zero indicates no child content headers, just bodies - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, classIfForBasic, 0, contentHeaderProperties, size); - if (_logger.isDebugEnabled()) { _logger.debug("Sending content header frame to " + destination); @@ -133,17 +128,19 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer frames[0] = publishFrame; frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - _protocolHandler.writeFrame(compositeFrame, wait); - if (message != origMessage) + try { - _logger.debug("Updating original message"); - origMessage.setJMSPriority(message.getJMSPriority()); - origMessage.setJMSTimestamp(message.getJMSTimestamp()); - _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); - origMessage.setJMSExpiration(message.getJMSExpiration()); - origMessage.setJMSMessageID(message.getJMSMessageID()); + _session.checkFlowControl(); } + catch (InterruptedException e) + { + JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed"); + jmsEx.setLinkedException(e); + throw jmsEx; + } + + _protocolHandler.writeFrame(compositeFrame, wait); } /** |