summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java155
1 files changed, 5 insertions, 150 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 62dcb90a86..5bb2955399 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -153,25 +153,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- private void declareDestination(AMQDestination destination)
- {
-
- ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getExchangeClass(),
- false,
- false,
- false,
- false,
- true,
- null);
- // Declare the exchange
- // Note that the durable and internal arguments are ignored since passive is set to false
-
- AMQFrame declare = body.generateFrame(_channelId);
-
- _protocolHandler.writeFrame(declare);
- }
+ abstract void declareDestination(AMQDestination destination);
public void setDisableMessageID(boolean b) throws JMSException
{
@@ -497,81 +479,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
type = AMQDestination.UNKNOWN_TYPE;
}
- // message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
-
-
- BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getRoutingKey(),
- mandatory,
- immediate);
-
- AMQFrame publishFrame = body.generateFrame(_channelId);
-
- message.prepareForSending();
- ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
-
- if (!_disableTimestamps)
- {
- final long currentTime = System.currentTimeMillis();
- contentHeaderProperties.setTimestamp(currentTime);
-
- if (timeToLive > 0)
- {
- contentHeaderProperties.setExpiration(currentTime + timeToLive);
- }
- else
- {
- contentHeaderProperties.setExpiration(0);
- }
- }
-
- contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
- contentHeaderProperties.setPriority((byte) priority);
-
- final int size = (payload != null) ? payload.limit() : 0;
- final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
- final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
-
- if (payload != null)
- {
- createContentBodies(payload, frames, 2, _channelId);
- }
-
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
- {
- _logger.debug("Sending content body frames to " + destination);
- }
-
-
- // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
- int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
-
- AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- classIfForBasic, 0, contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
- try
- {
- _session.checkFlowControl();
- }
- catch (InterruptedException e)
- {
- JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
-
- _protocolHandler.writeFrame(compositeFrame, wait);
+ sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
if (message != origMessage)
{
@@ -589,8 +497,9 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait)throws JMSException;
+ abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+ int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait)throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
@@ -612,60 +521,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- /**
- * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- * maximum frame size.
- *
- * @param payload
- * @param frames
- * @param offset
- * @param channelId @return the array of content bodies
- */
- private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
- {
-
- if (frames.length == (offset + 1))
- {
- frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
- }
- else
- {
-
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- long remaining = payload.remaining();
- for (int i = offset; i < frames.length; i++)
- {
- payload.position((int) framePayloadMax * (i - offset));
- int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
- payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
-
- remaining -= length;
- }
- }
-
- }
-
- private int calculateContentBodyFrameCount(ByteBuffer payload)
- {
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int frameCount;
- if ((payload == null) || (payload.remaining() == 0))
- {
- frameCount = 0;
- }
- else
- {
- int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
- frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- }
-
- return frameCount;
- }
-
public void setMimeType(String mimeType) throws JMSException
{
checkNotClosed();