summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-04-22 18:01:08 +0000
committerRafael H. Schloming <rhs@apache.org>2008-04-22 18:01:08 +0000
commit09742065d5d765f6bebf5fcd3a9aad9ef325b76d (patch)
tree3117b6bfd9bb26263cc219b68b64ac090c4c2bf2
parentea74745601a57d06633d44235e9bf61b9a88fb55 (diff)
downloadqpid-python-09742065d5d765f6bebf5fcd3a9aad9ef325b76d.tar.gz
QPID-832: moved more 0-8 specific code to 0-8 subclasses
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@650598 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java155
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java59
3 files changed, 37 insertions, 185 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 62dcb90a86..5bb2955399 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -153,25 +153,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- private void declareDestination(AMQDestination destination)
- {
-
- ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getExchangeClass(),
- false,
- false,
- false,
- false,
- true,
- null);
- // Declare the exchange
- // Note that the durable and internal arguments are ignored since passive is set to false
-
- AMQFrame declare = body.generateFrame(_channelId);
-
- _protocolHandler.writeFrame(declare);
- }
+ abstract void declareDestination(AMQDestination destination);
public void setDisableMessageID(boolean b) throws JMSException
{
@@ -497,81 +479,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
type = AMQDestination.UNKNOWN_TYPE;
}
- // message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
-
-
- BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getRoutingKey(),
- mandatory,
- immediate);
-
- AMQFrame publishFrame = body.generateFrame(_channelId);
-
- message.prepareForSending();
- ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
-
- if (!_disableTimestamps)
- {
- final long currentTime = System.currentTimeMillis();
- contentHeaderProperties.setTimestamp(currentTime);
-
- if (timeToLive > 0)
- {
- contentHeaderProperties.setExpiration(currentTime + timeToLive);
- }
- else
- {
- contentHeaderProperties.setExpiration(0);
- }
- }
-
- contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
- contentHeaderProperties.setPriority((byte) priority);
-
- final int size = (payload != null) ? payload.limit() : 0;
- final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
- final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
-
- if (payload != null)
- {
- createContentBodies(payload, frames, 2, _channelId);
- }
-
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
- {
- _logger.debug("Sending content body frames to " + destination);
- }
-
-
- // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
- int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
-
- AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- classIfForBasic, 0, contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
- try
- {
- _session.checkFlowControl();
- }
- catch (InterruptedException e)
- {
- JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
-
- _protocolHandler.writeFrame(compositeFrame, wait);
+ sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
if (message != origMessage)
{
@@ -589,8 +497,9 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait)throws JMSException;
+ abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+ int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait)throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
@@ -612,60 +521,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- /**
- * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- * maximum frame size.
- *
- * @param payload
- * @param frames
- * @param offset
- * @param channelId @return the array of content bodies
- */
- private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
- {
-
- if (frames.length == (offset + 1))
- {
- frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
- }
- else
- {
-
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- long remaining = payload.remaining();
- for (int i = offset; i < frames.length; i++)
- {
- payload.position((int) framePayloadMax * (i - offset));
- int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
- payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
-
- remaining -= length;
- }
- }
-
- }
-
- private int calculateContentBodyFrameCount(ByteBuffer payload)
- {
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int frameCount;
- if ((payload == null) || (payload.remaining() == 0))
- {
- frameCount = 0;
- }
- else
- {
- int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
- frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- }
-
- return frameCount;
- }
-
public void setMimeType(String mimeType) throws JMSException
{
checkNotClosed();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 2e31c43b4c..5fd6c23f68 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -55,7 +55,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
mandatory, waitUntilSent);
}
- public void declareDestination(AMQDestination destination)
+ void declareDestination(AMQDestination destination)
{
((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(),
destination.getExchangeClass().toString(),
@@ -67,9 +67,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
/**
* Sends a message to a given destination
*/
- public void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
- boolean wait) throws JMSException
+ void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+ int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
+ boolean wait) throws JMSException
{
message.prepareForSending();
if (message.get010Message() == null)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index d6ecb129c8..ff991b1a03 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -45,17 +45,18 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
}
- public void declareDestination(AMQDestination destination)
+ void declareDestination(AMQDestination destination)
{
+
ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getExchangeClass(),
- false,
- false,
- false,
- false,
- true,
- null);
+ destination.getExchangeName(),
+ destination.getExchangeClass(),
+ false,
+ false,
+ false,
+ false,
+ true,
+ null);
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
@@ -64,17 +65,15 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
_protocolHandler.writeFrame(declare);
}
- public void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
- int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
+ int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate,
+ boolean wait) throws JMSException
{
-// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getRoutingKey(),
- mandatory,
- immediate);
+ destination.getExchangeName(),
+ destination.getRoutingKey(),
+ mandatory,
+ immediate);
AMQFrame publishFrame = body.generateFrame(_channelId);
@@ -114,17 +113,13 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
_logger.debug("Sending content body frames to " + destination);
}
+
// TODO: This is a hacky way of getting the AMQP class-id for the Basic class
int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
- // weight argument of zero indicates no child content headers, just bodies
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-
AMQFrame contentHeaderFrame =
ContentHeaderBody.createAMQFrame(_channelId,
classIfForBasic, 0, contentHeaderProperties, size);
-
if (_logger.isDebugEnabled())
{
_logger.debug("Sending content header frame to " + destination);
@@ -133,17 +128,19 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- _protocolHandler.writeFrame(compositeFrame, wait);
- if (message != origMessage)
+ try
{
- _logger.debug("Updating original message");
- origMessage.setJMSPriority(message.getJMSPriority());
- origMessage.setJMSTimestamp(message.getJMSTimestamp());
- _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
- origMessage.setJMSExpiration(message.getJMSExpiration());
- origMessage.setJMSMessageID(message.getJMSMessageID());
+ _session.checkFlowControl();
}
+ catch (InterruptedException e)
+ {
+ JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+
+ _protocolHandler.writeFrame(compositeFrame, wait);
}
/**