summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java100
1 files changed, 55 insertions, 45 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 9b3b2ce0e9..98fa6de675 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client;
-import java.io.UnsupportedEncodingException;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -36,15 +35,15 @@ import javax.jms.Topic;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
+
+
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
private final Logger _logger ;
@@ -71,18 +70,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private AMQDestination _destination;
/**
- * Default encoding used for messages produced by this producer.
- */
- private String _encoding;
-
- /**
- * Default encoding used for message produced by this producer.
- */
- private String _mimeType;
-
- private AMQProtocolHandler _protocolHandler;
-
- /**
* True if this producer was created from a transacted session
*/
private boolean _transacted;
@@ -135,14 +122,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- Boolean immediate, Boolean mandatory) throws AMQException
+ AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
_logger = logger;
_connection = connection;
_destination = destination;
_transacted = transacted;
- _protocolHandler = protocolHandler;
_channelId = channelId;
_session = session;
_producerId = producerId;
@@ -163,6 +148,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
setPublishMode();
}
+ protected AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
void setPublishMode()
{
// Publish mode could be configured at destination level as well.
@@ -303,7 +293,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
checkPreConditions();
checkInitialDestination();
-
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
@@ -467,7 +456,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
JMSException ex = new JMSException("Error validating destination");
ex.initCause(e);
ex.setLinkedException(e);
-
+
throw ex;
}
amqDestination.setExchangeExistsChecked(true);
@@ -558,19 +547,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- public void setMimeType(String mimeType) throws JMSException
- {
- checkNotClosed();
- _mimeType = mimeType;
- }
-
- public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException
- {
- checkNotClosed();
- _encoding = encoding;
- }
-
- private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+ private void checkPreConditions() throws JMSException
{
checkNotClosed();
@@ -584,15 +561,16 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
- private void checkInitialDestination()
+ private void checkInitialDestination() throws JMSException
{
if (_destination == null)
{
throw new UnsupportedOperationException("Destination is null");
}
+ checkValidQueue();
}
- private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+ private void checkDestination(Destination suppliedDestination) throws JMSException
{
if ((_destination != null) && (suppliedDestination != null))
{
@@ -600,6 +578,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
"This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
+ if(suppliedDestination instanceof AMQQueue)
+ {
+ AMQQueue destination = (AMQQueue) suppliedDestination;
+ checkValidQueue(destination);
+ }
if (suppliedDestination == null)
{
throw new InvalidDestinationException("Supplied Destination was invalid");
@@ -607,6 +590,43 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
+ private void checkValidQueue() throws JMSException
+ {
+ if(_destination instanceof AMQQueue)
+ {
+ checkValidQueue((AMQQueue) _destination);
+ }
+ }
+
+ private void checkValidQueue(AMQQueue destination) throws JMSException
+ {
+ if (!destination.isCheckedForQueueBinding() && validateQueueOnSend())
+ {
+ if (getSession().isStrictAMQP())
+ {
+ getLogger().warn("AMQP does not support destination validation before publish");
+ destination.setCheckedForQueueBinding(true);
+ }
+ else
+ {
+ if (isBound(destination))
+ {
+ destination.setCheckedForQueueBinding(true);
+ }
+ else
+ {
+ throw new InvalidDestinationException("Queue: " + destination.getQueueName()
+ + " is not a valid destination (no binding on server)");
+ }
+ }
+ }
+ }
+
+ private boolean validateQueueOnSend()
+ {
+ return _connection.validateQueueOnSend();
+ }
+
/**
* The session used to create this producer
*/
@@ -645,16 +665,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_destination = destination;
}
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _protocolHandler;
- }
-
- protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
- {
- _protocolHandler = protocolHandler;
- }
-
protected int getChannelId()
{
return _channelId;