summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java58
1 files changed, 53 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 6c9fcc0f4c..5c48d73e43 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -268,6 +268,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
+ //Indicates whether we need to sync on every message ack
+ private boolean _syncAck;
+
+ //Indicates the sync publish options (persistent|all)
+ //By default it's async publish
+ private String _syncPublish = "";
+
/**
* @param broker brokerdetails
* @param username username
@@ -348,25 +355,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
// set this connection maxPrefetch
- if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
- _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
_maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
- if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
- _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
+ _syncPersistence =
+ Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
}
else
{
// use the defaul value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
+ if (_syncPersistence)
+ {
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
+ }
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
+ {
+ _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
+ {
+ _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
_failoverPolicy = new FailoverPolicy(connectionURL, this);
@@ -1469,6 +1504,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _syncPersistence;
}
+ /**
+ * Indicates whether we need to sync on every message ack
+ */
+ public boolean getSyncAck()
+ {
+ return _syncAck;
+ }
+
+ public String getSyncPublish()
+ {
+ return _syncPublish;
+ }
+
public void setIdleTimeout(long l)
{
_delegate.setIdleTimeout(l);