diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 954a3bc28f..5ff6066ddc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; + protected final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQConnection _connection; @@ -120,6 +122,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac protected String _userID; // ref user id used in the connection. private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + + protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, @@ -141,6 +145,26 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _mandatory = mandatory; _waitUntilSent = waitUntilSent; _userID = connection.getUsername(); + setPublishMode(); + } + + void setPublishMode() + { + // Publish mode could be configured at destination level as well. + // Will add support for this when we provide a more robust binding URL + + String syncPub = _connection.getSyncPublish(); + // Support for deprecated option sync_persistence + if (syncPub.equals("persistent") || _connection.getSyncPersistence()) + { + publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT; + } + else if (syncPub.equals("all")) + { + publishMode = PublishMode.SYNC_PUBLISH_ALL; + } + + _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); } void resubscribe() throws AMQException |