diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java | 155 |
1 files changed, 5 insertions, 150 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 62dcb90a86..5bb2955399 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -153,25 +153,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - private void declareDestination(AMQDestination destination) - { - - ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(), - destination.getExchangeName(), - destination.getExchangeClass(), - false, - false, - false, - false, - true, - null); - // Declare the exchange - // Note that the durable and internal arguments are ignored since passive is set to false - - AMQFrame declare = body.generateFrame(_channelId); - - _protocolHandler.writeFrame(declare); - } + abstract void declareDestination(AMQDestination destination); public void setDisableMessageID(boolean b) throws JMSException { @@ -497,81 +479,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac type = AMQDestination.UNKNOWN_TYPE; } - // message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - - - BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), - destination.getExchangeName(), - destination.getRoutingKey(), - mandatory, - immediate); - - AMQFrame publishFrame = body.generateFrame(_channelId); - - message.prepareForSending(); - ByteBuffer payload = message.getData(); - BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); - - if (!_disableTimestamps) - { - final long currentTime = System.currentTimeMillis(); - contentHeaderProperties.setTimestamp(currentTime); - - if (timeToLive > 0) - { - contentHeaderProperties.setExpiration(currentTime + timeToLive); - } - else - { - contentHeaderProperties.setExpiration(0); - } - } - - contentHeaderProperties.setDeliveryMode((byte) deliveryMode); - contentHeaderProperties.setPriority((byte) priority); - - final int size = (payload != null) ? payload.limit() : 0; - final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); - final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; - - if (payload != null) - { - createContentBodies(payload, frames, 2, _channelId); - } - - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) - { - _logger.debug("Sending content body frames to " + destination); - } - - - // TODO: This is a hacky way of getting the AMQP class-id for the Basic class - int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz(); - - AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, - classIfForBasic, 0, contentHeaderProperties, size); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending content header frame to " + destination); - } - - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - - try - { - _session.checkFlowControl(); - } - catch (InterruptedException e) - { - JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed"); - jmsEx.setLinkedException(e); - throw jmsEx; - } - - _protocolHandler.writeFrame(compositeFrame, wait); + sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait); if (message != origMessage) { @@ -589,8 +497,9 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode, - int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait)throws JMSException; + abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, + int deliveryMode, int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait)throws JMSException; private void checkTemporaryDestination(AMQDestination destination) throws JMSException { @@ -612,60 +521,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - /** - * Create content bodies. This will split a large message into numerous bodies depending on the negotiated - * maximum frame size. - * - * @param payload - * @param frames - * @param offset - * @param channelId @return the array of content bodies - */ - private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) - { - - if (frames.length == (offset + 1)) - { - frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); - } - else - { - - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - long remaining = payload.remaining(); - for (int i = offset; i < frames.length; i++) - { - payload.position((int) framePayloadMax * (i - offset)); - int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; - payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); - - remaining -= length; - } - } - - } - - private int calculateContentBodyFrameCount(ByteBuffer payload) - { - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - int frameCount; - if ((payload == null) || (payload.remaining() == 0)) - { - frameCount = 0; - } - else - { - int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; - frameCount = (int) (dataLength / framePayloadMax) + lastFrame; - } - - return frameCount; - } - public void setMimeType(String mimeType) throws JMSException { checkNotClosed(); |