diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java new file mode 100644 index 0000000000..8756ac4d05 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -0,0 +1,601 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.io.UnsupportedEncodingException; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer +{ + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; + + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + + private AMQConnection _connection; + + /** + * If true, messages will not get a timestamp. + */ + protected boolean _disableTimestamps; + + /** + * Priority of messages created by this producer. + */ + private int _messagePriority = Message.DEFAULT_PRIORITY; + + /** + * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + */ + private long _timeToLive; + + /** + * Delivery mode used for this producer. + */ + private int _deliveryMode = DeliveryMode.PERSISTENT; + + /** + * The Destination used for this consumer, if specified upon creation. + */ + protected AMQDestination _destination; + + /** + * Default encoding used for messages produced by this producer. + */ + private String _encoding; + + /** + * Default encoding used for message produced by this producer. + */ + private String _mimeType; + + protected AMQProtocolHandler _protocolHandler; + + /** + * True if this producer was created from a transacted session + */ + private boolean _transacted; + + protected int _channelId; + + /** + * This is an id generated by the session and is used to tie individual producers to the session. This means we + * can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers + * to the session so that when an error is propagated to the session it can close the producer (meaning that + * a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). + */ + private long _producerId; + + /** + * The session used to create this producer + */ + protected AMQSession _session; + + private final boolean _immediate; + + private final boolean _mandatory; + + private final boolean _waitUntilSent; + + private boolean _disableMessageId; + + private UUIDGen _messageIdGenerator = UUIDs.newGenerator(); + + protected String _userID; // ref user id used in the connection. + + private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + + protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; + + protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, + boolean waitUntilSent) throws AMQException + { + _connection = connection; + _destination = destination; + _transacted = transacted; + _protocolHandler = protocolHandler; + _channelId = channelId; + _session = session; + _producerId = producerId; + if (destination != null && !(destination instanceof AMQUndefinedDestination)) + { + declareDestination(destination); + } + + _immediate = immediate; + _mandatory = mandatory; + _waitUntilSent = waitUntilSent; + _userID = connection.getUsername(); + setPublishMode(); + } + + void setPublishMode() + { + // Publish mode could be configured at destination level as well. + // Will add support for this when we provide a more robust binding URL + + String syncPub = _connection.getSyncPublish(); + // Support for deprecated option sync_persistence + if (syncPub.equals("persistent") || _connection.getSyncPersistence()) + { + publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT; + } + else if (syncPub.equals("all")) + { + publishMode = PublishMode.SYNC_PUBLISH_ALL; + } + + _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); + } + + void resubscribe() throws AMQException + { + if (_destination != null && !(_destination instanceof AMQUndefinedDestination)) + { + declareDestination(_destination); + } + } + + abstract void declareDestination(AMQDestination destination) throws AMQException; + + public void setDisableMessageID(boolean b) throws JMSException + { + checkPreConditions(); + checkNotClosed(); + _disableMessageId = b; + } + + public boolean getDisableMessageID() throws JMSException + { + checkNotClosed(); + + return _disableMessageId; + } + + public void setDisableMessageTimestamp(boolean b) throws JMSException + { + checkPreConditions(); + _disableTimestamps = b; + } + + public boolean getDisableMessageTimestamp() throws JMSException + { + checkNotClosed(); + + return _disableTimestamps; + } + + public void setDeliveryMode(int i) throws JMSException + { + checkPreConditions(); + if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT)) + { + throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + + " is illegal"); + } + + _deliveryMode = i; + } + + public int getDeliveryMode() throws JMSException + { + checkNotClosed(); + + return _deliveryMode; + } + + public void setPriority(int i) throws JMSException + { + checkPreConditions(); + if ((i < 0) || (i > 9)) + { + throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9"); + } + + _messagePriority = i; + } + + public int getPriority() throws JMSException + { + checkNotClosed(); + + return _messagePriority; + } + + public void setTimeToLive(long l) throws JMSException + { + checkPreConditions(); + if (l < 0) + { + throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l); + } + + _timeToLive = l; + } + + public long getTimeToLive() throws JMSException + { + checkNotClosed(); + + return _timeToLive; + } + + public Destination getDestination() throws JMSException + { + checkNotClosed(); + + return _destination; + } + + public void close() + { + _closed.set(true); + _session.deregisterProducer(_producerId); + } + + public void send(Message message) throws JMSException + { + checkPreConditions(); + checkInitialDestination(); + + + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); + } + } + + public void send(Message message, int deliveryMode) throws JMSException + { + checkPreConditions(); + checkInitialDestination(); + + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); + } + } + + public void send(Message message, int deliveryMode, boolean immediate) throws JMSException + { + checkPreConditions(); + checkInitialDestination(); + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate); + } + } + + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException + { + checkPreConditions(); + checkInitialDestination(); + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); + } + } + + public void send(Destination destination, Message message) throws JMSException + { + checkPreConditions(); + checkDestination(destination); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, + _immediate); + } + } + + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(); + checkDestination(destination); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); + } + } + + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory) throws JMSException + { + checkPreConditions(); + checkDestination(destination); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate); + } + } + + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate) throws JMSException + { + checkPreConditions(); + checkDestination(destination); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate); + } + } + + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException + { + checkPreConditions(); + checkDestination(destination); + synchronized (_connection.getFailoverMutex()) + { + validateDestination(destination); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, + waitUntilSent); + } + } + + private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException + { + if (message instanceof AbstractJMSMessage) + { + return (AbstractJMSMessage) message; + } + else + { + AbstractJMSMessage newMessage; + + if (message instanceof BytesMessage) + { + newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage(); + } + else if (message instanceof MapMessage) + { + newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage(); + } + else if (message instanceof ObjectMessage) + { + newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage(); + } + else if (message instanceof TextMessage) + { + newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage(); + } + else if (message instanceof StreamMessage) + { + newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage(); + } + else + { + newMessage = new MessageConverter(_session, message).getConvertedMessage(); + } + + if (newMessage != null) + { + return newMessage; + } + else + { + throw new JMSException("Unable to send message, due to class conversion error: " + + message.getClass().getName()); + } + } + } + + private void validateDestination(Destination destination) throws JMSException + { + if (!(destination instanceof AMQDestination)) + { + throw new JMSException("Unsupported destination class: " + + ((destination != null) ? destination.getClass() : null)); + } + + AMQDestination amqDestination = (AMQDestination) destination; + if(!amqDestination.isExchangeExistsChecked()) + { + try + { + declareDestination(amqDestination); + } + catch(Exception e) + { + JMSException ex = new JMSException("Error validating destination"); + ex.initCause(e); + ex.setLinkedException(e); + + throw ex; + } + amqDestination.setExchangeExistsChecked(true); + } + } + + protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate) throws JMSException + { + sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); + } + + /** + * The caller of this method must hold the failover mutex. + * + * @param destination + * @param origMessage + * @param deliveryMode + * @param priority + * @param timeToLive + * @param mandatory + * @param immediate + * + * @throws JMSException + */ + protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate, boolean wait) throws JMSException + { + checkTemporaryDestination(destination); + origMessage.setJMSDestination(destination); + + AbstractJMSMessage message = convertToNativeMessage(origMessage); + + if (_transacted) + { + if (_session.hasFailedOver() && _session.isDirty()) + { + throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.", + new AMQSessionDirtyException("Failover has occurred and session is dirty " + + "so unable to send.")); + } + } + + UUID messageId = null; + if (_disableMessageId) + { + message.setJMSMessageID((UUID)null); + } + else + { + messageId = _messageIdGenerator.generate(); + message.setJMSMessageID(messageId); + } + + sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait); + + if (message != origMessage) + { + _logger.debug("Updating original message"); + origMessage.setJMSPriority(message.getJMSPriority()); + origMessage.setJMSTimestamp(message.getJMSTimestamp()); + _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); + origMessage.setJMSExpiration(message.getJMSExpiration()); + origMessage.setJMSMessageID(message.getJMSMessageID()); + } + + if (_transacted) + { + _session.markDirty(); + } + } + + abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, + UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait) throws JMSException; + + private void checkTemporaryDestination(AMQDestination destination) throws JMSException + { + if (destination instanceof TemporaryDestination) + { + _logger.debug("destination is temporary destination"); + TemporaryDestination tempDest = (TemporaryDestination) destination; + if (tempDest.getSession().isClosed()) + { + _logger.debug("session is closed"); + throw new JMSException("Session for temporary destination has been closed"); + } + + if (tempDest.isDeleted()) + { + _logger.debug("destination is deleted"); + throw new JMSException("Cannot send to a deleted temporary destination"); + } + } + } + + public void setMimeType(String mimeType) throws JMSException + { + checkNotClosed(); + _mimeType = mimeType; + } + + public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException + { + checkNotClosed(); + _encoding = encoding; + } + + private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException + { + checkNotClosed(); + + if ((_session == null) || _session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + if(_session.getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection closed"); + } + } + + private void checkInitialDestination() + { + if (_destination == null) + { + throw new UnsupportedOperationException("Destination is null"); + } + } + + private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException + { + if ((_destination != null) && (suppliedDestination != null)) + { + throw new UnsupportedOperationException( + "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); + } + + if (suppliedDestination == null) + { + throw new InvalidDestinationException("Supplied Destination was invalid"); + } + + } + + public AMQSession getSession() + { + return _session; + } + + public boolean isBound(AMQDestination destination) throws JMSException + { + return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey()); + } +} |