summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java60
1 files changed, 31 insertions, 29 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index b2f998cb2c..21ff6c877a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -20,18 +20,9 @@
*/
package org.apache.qpid.client;
-import java.util.UUID;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Topic;
-import javax.jms.Queue;
-
-import java.nio.ByteBuffer;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -42,13 +33,24 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
+ private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class);
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
- super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
}
void declareDestination(AMQDestination destination)
@@ -56,7 +58,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
final MethodRegistry methodRegistry = getSession().getMethodRegistry();
ExchangeDeclareBody body =
- methodRegistry.createExchangeDeclareBody(_session.getTicket(),
+ methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getExchangeClass(),
destination.getExchangeName().toString().startsWith("amq."),
@@ -68,29 +70,29 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = body.generateFrame(_channelId);
+ AMQFrame declare = body.generateFrame(getChannelId());
- _protocolHandler.writeFrame(declare);
+ getProtocolHandler().writeFrame(declare);
}
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException
{
- BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+ BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getRoutingKey(),
mandatory,
immediate);
- AMQFrame publishFrame = body.generateFrame(_channelId);
+ AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
- contentHeaderProperties.setUserId(_userID);
+ contentHeaderProperties.setUserId(getUserID());
//Set the JMS_QPID_DESTTYPE for 0-8/9 messages
int type;
@@ -110,7 +112,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
//Set JMS_QPID_DESTTYPE
delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
- if (!_disableTimestamps)
+ if (!isDisableTimestamps())
{
final long currentTime = System.currentTimeMillis();
contentHeaderProperties.setTimestamp(currentTime);
@@ -134,12 +136,12 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
if (payload != null)
{
- createContentBodies(payload, frames, 2, _channelId);
+ createContentBodies(payload, frames, 2, getChannelId());
}
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
+ if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled())
{
- _logger.debug("Sending content body frames to " + destination);
+ getLogger().debug("Sending content body frames to " + destination);
}
@@ -147,11 +149,11 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
+ ContentHeaderBody.createAMQFrame(getChannelId(),
classIfForBasic, 0, contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("Sending content header frame to " + destination);
+ getLogger().debug("Sending content header frame to " + destination);
}
frames[0] = publishFrame;
@@ -160,7 +162,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
try
{
- _session.checkFlowControl();
+ getSession().checkFlowControl();
}
catch (InterruptedException e)
{
@@ -170,7 +172,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
throw jmse;
}
- _protocolHandler.writeFrame(compositeFrame);
+ getProtocolHandler().writeFrame(compositeFrame);
}
/**
@@ -194,7 +196,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
else
{
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
long remaining = payload.remaining();
for (int i = offset; i < frames.length; i++)
{
@@ -224,7 +226,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
else
{
int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
}