summaryrefslogtreecommitdiff
path: root/java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r--java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java484
1 files changed, 0 insertions, 484 deletions
diff --git a/java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
deleted file mode 100644
index 14cafc3558..0000000000
--- a/java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.*;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import java.io.UnsupportedEncodingException;
-
-public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
-{
- protected final Logger _logger = Logger.getLogger(getClass());
-
- private AMQConnection _connection;
-
- /**
- * If true, messages will not get a timestamp.
- */
- private boolean _disableTimestamps;
-
- /**
- * Priority of messages created by this producer.
- */
- private int _messagePriority;
-
- /**
- * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
- */
- private long _timeToLive;
-
- /**
- * Delivery mode used for this producer.
- */
- private int _deliveryMode = DeliveryMode.PERSISTENT;
-
- /**
- * The Destination used for this consumer, if specified upon creation.
- */
- protected AMQDestination _destination;
-
- /**
- * Default encoding used for messages produced by this producer.
- */
- private String _encoding;
-
- /**
- * Default encoding used for message produced by this producer.
- */
- private String _mimeType;
-
- private AMQProtocolHandler _protocolHandler;
-
- /**
- * True if this producer was created from a transacted session
- */
- private boolean _transacted;
-
- private int _channelId;
-
- /**
- * This is an id generated by the session and is used to tie individual producers to the session. This means we
- * can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers
- * to the session so that when an error is propagated to the session it can close the producer (meaning that
- * a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
- */
- private long _producerId;
-
- /**
- * The session used to create this producer
- */
- private AMQSession _session;
-
- private final boolean _immediate;
-
- private final boolean _mandatory;
-
- private final boolean _waitUntilSent;
-
- protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
- int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
- long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent)
- {
- _connection = connection;
- _destination = destination;
- _transacted = transacted;
- _protocolHandler = protocolHandler;
- _channelId = channelId;
- _session = session;
- _producerId = producerId;
- if (destination != null)
- {
- declareDestination(destination);
- }
- _immediate = immediate;
- _mandatory = mandatory;
- _waitUntilSent = waitUntilSent;
- }
-
- void resubscribe() throws AMQException
- {
- if (_destination != null)
- {
- declareDestination(_destination);
- }
- }
-
- private void declareDestination(AMQDestination destination)
- {
- // Declare the exchange
- // Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getExchangeClass(), false,
- false, false, false, true, null);
- _protocolHandler.writeFrame(declare);
- }
-
- public void setDisableMessageID(boolean b) throws JMSException
- {
- checkNotClosed();
- // IGNORED
- }
-
- public boolean getDisableMessageID() throws JMSException
- {
- checkNotClosed();
- // Always false for AMQP
- return false;
- }
-
- public void setDisableMessageTimestamp(boolean b) throws JMSException
- {
- checkNotClosed();
- _disableTimestamps = b;
- }
-
- public boolean getDisableMessageTimestamp() throws JMSException
- {
- checkNotClosed();
- return _disableTimestamps;
- }
-
- public void setDeliveryMode(int i) throws JMSException
- {
- checkNotClosed();
- if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
- {
- throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
- " is illegal");
- }
- _deliveryMode = i;
- }
-
- public int getDeliveryMode() throws JMSException
- {
- checkNotClosed();
- return _deliveryMode;
- }
-
- public void setPriority(int i) throws JMSException
- {
- checkNotClosed();
- if (i < 0 || i > 9)
- {
- throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
- }
- _messagePriority = i;
- }
-
- public int getPriority() throws JMSException
- {
- checkNotClosed();
- return _messagePriority;
- }
-
- public void setTimeToLive(long l) throws JMSException
- {
- checkNotClosed();
- if (l < 0)
- {
- throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
- }
- _timeToLive = l;
- }
-
- public long getTimeToLive() throws JMSException
- {
- checkNotClosed();
- return _timeToLive;
- }
-
- public Destination getDestination() throws JMSException
- {
- checkNotClosed();
- return _destination;
- }
-
- public void close() throws JMSException
- {
- _closed.set(true);
- _session.deregisterProducer(_producerId);
- }
-
- public void send(Message message) throws JMSException
- {
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
- }
- }
-
- public void send(Message message, int deliveryMode) throws JMSException
- {
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
- }
- }
-
- public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
- {
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
- _mandatory, immediate);
- }
- }
-
- public void send(Message message, int deliveryMode, int priority,
- long timeToLive) throws JMSException
- {
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
- _immediate);
- }
- }
-
- public void send(Destination destination, Message message) throws JMSException
- {
- checkNotClosed();
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
- }
- }
-
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive)
- throws JMSException
- {
- checkNotClosed();
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
- _mandatory, _immediate);
- }
- }
-
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory)
- throws JMSException
- {
- checkNotClosed();
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
- mandatory, _immediate);
- }
- }
-
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory, boolean immediate)
- throws JMSException
- {
- checkNotClosed();
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
- mandatory, immediate);
- }
- }
-
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean waitUntilSent)
- throws JMSException
- {
- checkNotClosed();
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
- mandatory, immediate, waitUntilSent);
- }
- }
-
- private void validateDestination(Destination destination) throws JMSException
- {
- if (!(destination instanceof AMQDestination))
- {
- throw new JMSException("Unsupported destination class: " +
- (destination != null ? destination.getClass() : null));
- }
- declareDestination((AMQDestination)destination);
- }
-
- protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate) throws JMSException
- {
- sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
- }
-
- /**
- * The caller of this method must hold the failover mutex.
- * @param destination
- * @param message
- * @param deliveryMode
- * @param priority
- * @param timeToLive
- * @param mandatory
- * @param immediate
- * @throws JMSException
- */
- protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
- {
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getRoutingKey(), mandatory, immediate);
-
- long currentTime = 0;
- if (!_disableTimestamps)
- {
- currentTime = System.currentTimeMillis();
- message.setJMSTimestamp(currentTime);
- }
- //
- // Very nasty temporary hack for GRM-206. Will be altered ASAP.
- //
- if (message instanceof JMSBytesMessage)
- {
- JMSBytesMessage msg = (JMSBytesMessage) message;
- if (!msg.isReadable())
- {
- msg.reset();
- }
- }
- ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties();
-
- if (timeToLive > 0)
- {
- if (!_disableTimestamps)
- {
- contentHeaderProperties.setExpiration(currentTime + timeToLive);
- }
- }
- else
- {
- if (!_disableTimestamps)
- {
- contentHeaderProperties.setExpiration(0);
- }
- }
- contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
- contentHeaderProperties.setPriority((byte) priority);
-
- int size = payload.limit();
- ContentBody[] contentBodies = createContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
- for (int i = 0; i < contentBodies.length; i++)
- {
- frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]);
- }
- if (contentBodies.length > 0 && _logger.isDebugEnabled())
- {
- _logger.debug("Sending content body frames to " + destination);
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0,
- contentHeaderProperties,
- size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- _protocolHandler.writeFrame(compositeFrame, wait);
- }
-
- /**
- * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- * maximum frame size.
- * @param payload
- * @return the array of content bodies
- */
- private ContentBody[] createContentBodies(ByteBuffer payload)
- {
- if (payload == null)
- {
- return null;
- }
- else if (payload.remaining() == 0)
- {
- return new ContentBody[0];
- }
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- final ContentBody[] bodies = new ContentBody[frameCount];
-
- if (frameCount == 1)
- {
- bodies[0] = new ContentBody();
- bodies[0].payload = payload;
- }
- else
- {
- long remaining = dataLength;
- for (int i = 0; i < bodies.length; i++)
- {
- bodies[i] = new ContentBody();
- payload.position((int)framePayloadMax * i);
- int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining;
- payload.limit(payload.position() + length);
- bodies[i].payload = payload.slice();
- remaining -= length;
- }
- }
- return bodies;
- }
-
- public void setMimeType(String mimeType) throws JMSException
- {
- checkNotClosed();
- _mimeType = mimeType;
- }
-
- public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException
- {
- checkNotClosed();
- _encoding = encoding;
- }
-}