diff options
Diffstat (limited to 'java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r-- | java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java new file mode 100644 index 0000000000..14cafc3558 --- /dev/null +++ b/java/java/client/src/org/apache/qpid/client/BasicMessageProducer.java @@ -0,0 +1,484 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.JMSBytesMessage; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.framing.*; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import java.io.UnsupportedEncodingException; + +public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer +{ + protected final Logger _logger = Logger.getLogger(getClass()); + + private AMQConnection _connection; + + /** + * If true, messages will not get a timestamp. + */ + private boolean _disableTimestamps; + + /** + * Priority of messages created by this producer. + */ + private int _messagePriority; + + /** + * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + */ + private long _timeToLive; + + /** + * Delivery mode used for this producer. + */ + private int _deliveryMode = DeliveryMode.PERSISTENT; + + /** + * The Destination used for this consumer, if specified upon creation. + */ + protected AMQDestination _destination; + + /** + * Default encoding used for messages produced by this producer. + */ + private String _encoding; + + /** + * Default encoding used for message produced by this producer. + */ + private String _mimeType; + + private AMQProtocolHandler _protocolHandler; + + /** + * True if this producer was created from a transacted session + */ + private boolean _transacted; + + private int _channelId; + + /** + * This is an id generated by the session and is used to tie individual producers to the session. This means we + * can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers + * to the session so that when an error is propagated to the session it can close the producer (meaning that + * a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). + */ + private long _producerId; + + /** + * The session used to create this producer + */ + private AMQSession _session; + + private final boolean _immediate; + + private final boolean _mandatory; + + private final boolean _waitUntilSent; + + protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, + int channelId, AMQSession session, AMQProtocolHandler protocolHandler, + long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent) + { + _connection = connection; + _destination = destination; + _transacted = transacted; + _protocolHandler = protocolHandler; + _channelId = channelId; + _session = session; + _producerId = producerId; + if (destination != null) + { + declareDestination(destination); + } + _immediate = immediate; + _mandatory = mandatory; + _waitUntilSent = waitUntilSent; + } + + void resubscribe() throws AMQException + { + if (_destination != null) + { + declareDestination(_destination); + } + } + + private void declareDestination(AMQDestination destination) + { + // Declare the exchange + // Note that the durable and internal arguments are ignored since passive is set to false + AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), + destination.getExchangeClass(), false, + false, false, false, true, null); + _protocolHandler.writeFrame(declare); + } + + public void setDisableMessageID(boolean b) throws JMSException + { + checkNotClosed(); + // IGNORED + } + + public boolean getDisableMessageID() throws JMSException + { + checkNotClosed(); + // Always false for AMQP + return false; + } + + public void setDisableMessageTimestamp(boolean b) throws JMSException + { + checkNotClosed(); + _disableTimestamps = b; + } + + public boolean getDisableMessageTimestamp() throws JMSException + { + checkNotClosed(); + return _disableTimestamps; + } + + public void setDeliveryMode(int i) throws JMSException + { + checkNotClosed(); + if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT) + { + throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + + " is illegal"); + } + _deliveryMode = i; + } + + public int getDeliveryMode() throws JMSException + { + checkNotClosed(); + return _deliveryMode; + } + + public void setPriority(int i) throws JMSException + { + checkNotClosed(); + if (i < 0 || i > 9) + { + throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9"); + } + _messagePriority = i; + } + + public int getPriority() throws JMSException + { + checkNotClosed(); + return _messagePriority; + } + + public void setTimeToLive(long l) throws JMSException + { + checkNotClosed(); + if (l < 0) + { + throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l); + } + _timeToLive = l; + } + + public long getTimeToLive() throws JMSException + { + checkNotClosed(); + return _timeToLive; + } + + public Destination getDestination() throws JMSException + { + checkNotClosed(); + return _destination; + } + + public void close() throws JMSException + { + _closed.set(true); + _session.deregisterProducer(_producerId); + } + + public void send(Message message) throws JMSException + { + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, + _mandatory, _immediate); + } + } + + public void send(Message message, int deliveryMode) throws JMSException + { + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, + _mandatory, _immediate); + } + } + + public void send(Message message, int deliveryMode, boolean immediate) throws JMSException + { + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, + _mandatory, immediate); + } + } + + public void send(Message message, int deliveryMode, int priority, + long timeToLive) throws JMSException + { + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory, + _immediate); + } + } + + public void send(Destination destination, Message message) throws JMSException + { + checkNotClosed(); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, + _mandatory, _immediate); + } + } + + public void send(Destination destination, Message message, int deliveryMode, + int priority, long timeToLive) + throws JMSException + { + checkNotClosed(); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + _mandatory, _immediate); + } + } + + public void send(Destination destination, Message message, int deliveryMode, + int priority, long timeToLive, boolean mandatory) + throws JMSException + { + checkNotClosed(); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + mandatory, _immediate); + } + } + + public void send(Destination destination, Message message, int deliveryMode, + int priority, long timeToLive, boolean mandatory, boolean immediate) + throws JMSException + { + checkNotClosed(); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + mandatory, immediate); + } + } + + public void send(Destination destination, Message message, int deliveryMode, + int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean waitUntilSent) + throws JMSException + { + checkNotClosed(); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + mandatory, immediate, waitUntilSent); + } + } + + private void validateDestination(Destination destination) throws JMSException + { + if (!(destination instanceof AMQDestination)) + { + throw new JMSException("Unsupported destination class: " + + (destination != null ? destination.getClass() : null)); + } + declareDestination((AMQDestination)destination); + } + + protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, + long timeToLive, boolean mandatory, boolean immediate) throws JMSException + { + sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); + } + + /** + * The caller of this method must hold the failover mutex. + * @param destination + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @param mandatory + * @param immediate + * @throws JMSException + */ + protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, + long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + { + AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), + destination.getRoutingKey(), mandatory, immediate); + + long currentTime = 0; + if (!_disableTimestamps) + { + currentTime = System.currentTimeMillis(); + message.setJMSTimestamp(currentTime); + } + // + // Very nasty temporary hack for GRM-206. Will be altered ASAP. + // + if (message instanceof JMSBytesMessage) + { + JMSBytesMessage msg = (JMSBytesMessage) message; + if (!msg.isReadable()) + { + msg.reset(); + } + } + ByteBuffer payload = message.getData(); + BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties(); + + if (timeToLive > 0) + { + if (!_disableTimestamps) + { + contentHeaderProperties.setExpiration(currentTime + timeToLive); + } + } + else + { + if (!_disableTimestamps) + { + contentHeaderProperties.setExpiration(0); + } + } + contentHeaderProperties.setDeliveryMode((byte) deliveryMode); + contentHeaderProperties.setPriority((byte) priority); + + int size = payload.limit(); + ContentBody[] contentBodies = createContentBodies(payload); + AMQFrame[] frames = new AMQFrame[2 + contentBodies.length]; + for (int i = 0; i < contentBodies.length; i++) + { + frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]); + } + if (contentBodies.length > 0 && _logger.isDebugEnabled()) + { + _logger.debug("Sending content body frames to " + destination); + } + + // weight argument of zero indicates no child content headers, just bodies + AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0, + contentHeaderProperties, + size); + if (_logger.isDebugEnabled()) + { + _logger.debug("Sending content header frame to " + destination); + } + + frames[0] = publishFrame; + frames[1] = contentHeaderFrame; + CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + _protocolHandler.writeFrame(compositeFrame, wait); + } + + /** + * Create content bodies. This will split a large message into numerous bodies depending on the negotiated + * maximum frame size. + * @param payload + * @return the array of content bodies + */ + private ContentBody[] createContentBodies(ByteBuffer payload) + { + if (payload == null) + { + return null; + } + else if (payload.remaining() == 0) + { + return new ContentBody[0]; + } + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // (0xCE byte). + int dataLength = payload.remaining(); + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; + int frameCount = (int) (dataLength / framePayloadMax) + lastFrame; + final ContentBody[] bodies = new ContentBody[frameCount]; + + if (frameCount == 1) + { + bodies[0] = new ContentBody(); + bodies[0].payload = payload; + } + else + { + long remaining = dataLength; + for (int i = 0; i < bodies.length; i++) + { + bodies[i] = new ContentBody(); + payload.position((int)framePayloadMax * i); + int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining; + payload.limit(payload.position() + length); + bodies[i].payload = payload.slice(); + remaining -= length; + } + } + return bodies; + } + + public void setMimeType(String mimeType) throws JMSException + { + checkNotClosed(); + _mimeType = mimeType; + } + + public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException + { + checkNotClosed(); + _encoding = encoding; + } +} |