diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-03-10 19:22:10 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-03-10 19:22:10 +0000 |
commit | 4eaa4e42093e5524d9552d8fa312c214524b6bb4 (patch) | |
tree | a251d57ee92d9c779fe4455c583be0ed90e69a43 /qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java | |
parent | 92be7e8f3163c048a8642d2deeaa921bbb65dc9c (diff) | |
download | qpid-python-rg-amqp-1-0-sandbox.tar.gz |
NO-JIRA : AMQP-1-0 sandbox updates - merge from trunkrg-amqp-1-0-sandbox
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1299257 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java | 60 |
1 files changed, 31 insertions, 29 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index b2f998cb2c..21ff6c877a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -20,18 +20,9 @@ */ package org.apache.qpid.client; -import java.util.UUID; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Topic; -import javax.jms.Queue; - -import java.nio.ByteBuffer; - import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -42,13 +33,24 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Topic; +import java.nio.ByteBuffer; +import java.util.UUID; + public class BasicMessageProducer_0_8 extends BasicMessageProducer { + private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class); BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); + super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); } void declareDestination(AMQDestination destination) @@ -56,7 +58,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer final MethodRegistry methodRegistry = getSession().getMethodRegistry(); ExchangeDeclareBody body = - methodRegistry.createExchangeDeclareBody(_session.getTicket(), + methodRegistry.createExchangeDeclareBody(getSession().getTicket(), destination.getExchangeName(), destination.getExchangeClass(), destination.getExchangeName().toString().startsWith("amq."), @@ -68,29 +70,29 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false - AMQFrame declare = body.generateFrame(_channelId); + AMQFrame declare = body.generateFrame(getChannelId()); - _protocolHandler.writeFrame(declare); + getProtocolHandler().writeFrame(declare); } void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { - BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), + BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); - AMQFrame publishFrame = body.generateFrame(_channelId); + AMQFrame publishFrame = body.generateFrame(getChannelId()); message.prepareForSending(); ByteBuffer payload = message.getData(); AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); - contentHeaderProperties.setUserId(_userID); + contentHeaderProperties.setUserId(getUserID()); //Set the JMS_QPID_DESTTYPE for 0-8/9 messages int type; @@ -110,7 +112,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer //Set JMS_QPID_DESTTYPE delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - if (!_disableTimestamps) + if (!isDisableTimestamps()) { final long currentTime = System.currentTimeMillis(); contentHeaderProperties.setTimestamp(currentTime); @@ -134,12 +136,12 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (payload != null) { - createContentBodies(payload, frames, 2, _channelId); + createContentBodies(payload, frames, 2, getChannelId()); } - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) + if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled()) { - _logger.debug("Sending content body frames to " + destination); + getLogger().debug("Sending content body frames to " + destination); } @@ -147,11 +149,11 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz(); AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, + ContentHeaderBody.createAMQFrame(getChannelId(), classIfForBasic, 0, contentHeaderProperties, size); - if (_logger.isDebugEnabled()) + if (getLogger().isDebugEnabled()) { - _logger.debug("Sending content header frame to " + destination); + getLogger().debug("Sending content header frame to " + destination); } frames[0] = publishFrame; @@ -160,7 +162,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer try { - _session.checkFlowControl(); + getSession().checkFlowControl(); } catch (InterruptedException e) { @@ -170,7 +172,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer throw jmse; } - _protocolHandler.writeFrame(compositeFrame); + getProtocolHandler().writeFrame(compositeFrame); } /** @@ -194,7 +196,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer else { - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1; long remaining = payload.remaining(); for (int i = offset; i < frames.length; i++) { @@ -224,7 +226,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer else { int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1; int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; frameCount = (int) (dataLength / framePayloadMax) + lastFrame; } |