diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java | 100 |
1 files changed, 55 insertions, 45 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 9b3b2ce0e9..98fa6de675 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.client; -import java.io.UnsupportedEncodingException; import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; @@ -36,15 +35,15 @@ import javax.jms.Topic; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; private final Logger _logger ; @@ -71,18 +70,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private AMQDestination _destination; /** - * Default encoding used for messages produced by this producer. - */ - private String _encoding; - - /** - * Default encoding used for message produced by this producer. - */ - private String _mimeType; - - private AMQProtocolHandler _protocolHandler; - - /** * True if this producer was created from a transacted session */ private boolean _transacted; @@ -135,14 +122,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - Boolean immediate, Boolean mandatory) throws AMQException + AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { _logger = logger; _connection = connection; _destination = destination; _transacted = transacted; - _protocolHandler = protocolHandler; _channelId = channelId; _session = session; _producerId = producerId; @@ -163,6 +148,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac setPublishMode(); } + protected AMQConnection getConnection() + { + return _connection; + } + void setPublishMode() { // Publish mode could be configured at destination level as well. @@ -303,7 +293,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac checkPreConditions(); checkInitialDestination(); - synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); @@ -467,7 +456,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac JMSException ex = new JMSException("Error validating destination"); ex.initCause(e); ex.setLinkedException(e); - + throw ex; } amqDestination.setExchangeExistsChecked(true); @@ -558,19 +547,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - public void setMimeType(String mimeType) throws JMSException - { - checkNotClosed(); - _mimeType = mimeType; - } - - public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException - { - checkNotClosed(); - _encoding = encoding; - } - - private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException + private void checkPreConditions() throws JMSException { checkNotClosed(); @@ -584,15 +561,16 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } - private void checkInitialDestination() + private void checkInitialDestination() throws JMSException { if (_destination == null) { throw new UnsupportedOperationException("Destination is null"); } + checkValidQueue(); } - private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException + private void checkDestination(Destination suppliedDestination) throws JMSException { if ((_destination != null) && (suppliedDestination != null)) { @@ -600,6 +578,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } + if(suppliedDestination instanceof AMQQueue) + { + AMQQueue destination = (AMQQueue) suppliedDestination; + checkValidQueue(destination); + } if (suppliedDestination == null) { throw new InvalidDestinationException("Supplied Destination was invalid"); @@ -607,6 +590,43 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } + private void checkValidQueue() throws JMSException + { + if(_destination instanceof AMQQueue) + { + checkValidQueue((AMQQueue) _destination); + } + } + + private void checkValidQueue(AMQQueue destination) throws JMSException + { + if (!destination.isCheckedForQueueBinding() && validateQueueOnSend()) + { + if (getSession().isStrictAMQP()) + { + getLogger().warn("AMQP does not support destination validation before publish"); + destination.setCheckedForQueueBinding(true); + } + else + { + if (isBound(destination)) + { + destination.setCheckedForQueueBinding(true); + } + else + { + throw new InvalidDestinationException("Queue: " + destination.getQueueName() + + " is not a valid destination (no binding on server)"); + } + } + } + } + + private boolean validateQueueOnSend() + { + return _connection.validateQueueOnSend(); + } + /** * The session used to create this producer */ @@ -645,16 +665,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _destination = destination; } - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - - protected void setProtocolHandler(AMQProtocolHandler protocolHandler) - { - _protocolHandler = protocolHandler; - } - protected int getChannelId() { return _channelId; |