diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 58 |
1 files changed, 53 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6c9fcc0f4c..5c48d73e43 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -268,6 +268,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates whether persistent messages are synchronized private boolean _syncPersistence; + //Indicates whether we need to sync on every message ack + private boolean _syncAck; + + //Indicates the sync publish options (persistent|all) + //By default it's async publish + private String _syncPublish = ""; + /** * @param broker brokerdetails * @param username username @@ -348,25 +355,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { // set this connection maxPrefetch - if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { - _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); } else { // use the defaul value set for all connections _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, - ClientProperties.MAX_PREFETCH_DEFAULT)); + ClientProperties.MAX_PREFETCH_DEFAULT)); } - if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { - _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE)); + _syncPersistence = + Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); } else { // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); + if (_syncPersistence) + { + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); + } + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) + { + _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); + } + else + { + // use the defaul value set for all connections + _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) + { + _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); + } + else + { + // use the defaul value set for all connections + _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } _failoverPolicy = new FailoverPolicy(connectionURL, this); @@ -1469,6 +1504,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _syncPersistence; } + /** + * Indicates whether we need to sync on every message ack + */ + public boolean getSyncAck() + { + return _syncAck; + } + + public String getSyncPublish() + { + return _syncPublish; + } + public void setIdleTimeout(long l) { _delegate.setIdleTimeout(l); |