summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java58
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java52
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java117
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java74
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java24
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java23
14 files changed, 330 insertions, 173 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 6c9fcc0f4c..5c48d73e43 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -268,6 +268,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
+ //Indicates whether we need to sync on every message ack
+ private boolean _syncAck;
+
+ //Indicates the sync publish options (persistent|all)
+ //By default it's async publish
+ private String _syncPublish = "";
+
/**
* @param broker brokerdetails
* @param username username
@@ -348,25 +355,53 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
// set this connection maxPrefetch
- if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
- _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
_maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
- if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
- _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
+ _syncPersistence =
+ Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
}
else
{
// use the defaul value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
+ if (_syncPersistence)
+ {
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
+ }
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
+ {
+ _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
+ {
+ _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
_failoverPolicy = new FailoverPolicy(connectionURL, this);
@@ -1469,6 +1504,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _syncPersistence;
}
+ /**
+ * Indicates whether we need to sync on every message ack
+ */
+ public boolean getSyncAck()
+ {
+ return _syncAck;
+ }
+
+ public String getSyncPublish()
+ {
+ return _syncPublish;
+ }
+
public void setIdleTimeout(long l)
{
_delegate.setIdleTimeout(l);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 29f1aec2f5..c2fb05d94e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -239,12 +239,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn.failoverPrep();
_qpidConnection.resume();
-
- if (_conn.firePreResubscribe())
- {
- _conn.resubscribeSessions();
- }
-
_conn.fireFailoverComplete();
return;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 08fd49286b..d96544adf8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -22,14 +22,12 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.AMQException;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
-
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,38 +88,11 @@ public class AMQQueueBrowser implements QueueBrowser
{
checkState();
final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
_consumers.add(consumer);
- return new Enumeration()
- {
-
- Message _nextMessage = consumer == null ? null : consumer.receive(1000);
-
- public boolean hasMoreElements()
- {
- _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
- return (_nextMessage != null);
- }
-
- public Object nextElement()
- {
- Message msg = _nextMessage;
- try
- {
- _logger.info("QB:nextElement about to receive");
- _nextMessage = consumer.receive(1000);
- _logger.info("QB:nextElement received:" + _nextMessage);
- }
- catch (JMSException e)
- {
- _logger.warn("Exception caught while queue browsing", e);
- _nextMessage = null;
- }
- return msg;
- }
- };
+ return new QueueBrowserEnumeration(consumer);
}
public void close() throws JMSException
@@ -134,4 +105,39 @@ public class AMQQueueBrowser implements QueueBrowser
_consumers.clear();
}
+ private class QueueBrowserEnumeration implements Enumeration
+ {
+ Message _nextMessage;
+ private BasicMessageConsumer _consumer;
+
+ public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException
+ {
+ _nextMessage = consumer == null ? null : consumer.receiveBrowse();
+ _logger.info("QB:created with first element:" + _nextMessage);
+ _consumer = consumer;
+ }
+
+ public boolean hasMoreElements()
+ {
+ _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ return (_nextMessage != null);
+ }
+
+ public Object nextElement()
+ {
+ Message msg = _nextMessage;
+ try
+ {
+ _logger.info("QB:nextElement about to receive");
+ _nextMessage = _consumer.receiveBrowse();
+ _logger.info("QB:nextElement received:" + _nextMessage);
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("Exception caught while queue browsing", e);
+ _nextMessage = null;
+ }
+ return msg;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 733bee2d81..9012632adf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -575,12 +575,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
{
+ bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
+ }
+
+ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName, final AMQDestination destination,
+ final boolean nowait) throws AMQException
+ {
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
+ sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait);
return null;
}
}, _connection).execute();
@@ -595,7 +602,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
+ final AMQShortString exchangeName, AMQDestination destination,
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Closes the session.
@@ -1815,6 +1823,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void failoverPrep()
{
startDispatcherIfNecessary();
+ syncDispatchQueue();
+ }
+
+ void syncDispatchQueue()
+ {
final CountDownLatch signal = new CountDownLatch(1);
_queue.add(new Dispatchable() {
public void dispatch(AMQSession ssn)
@@ -1828,7 +1841,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // pass
+ throw new RuntimeException(e);
}
}
@@ -1859,6 +1872,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_inRecovery = inRecovery;
}
+ boolean isStarted()
+ {
+ return _startedAtLeastOnce.get();
+ }
+
/**
* Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
*
@@ -2281,7 +2299,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal) throws AMQException
+ {
+ return declareQueue(amqd, protocolHandler, noLocal, false);
+ }
+
+ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean noLocal, final boolean nowait)
throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -2296,14 +2320,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
amqd.setQueueName(protocolHandler.generateQueueName());
}
- sendQueueDeclare(amqd, protocolHandler);
+ sendQueueDeclare(amqd, protocolHandler, nowait);
return amqd.getAMQQueueName();
}
}, _connection).execute();
}
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
+ public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2416,14 +2441,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
+ declareExchange(amqd, protocolHandler, nowait);
- AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal());
+ AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
// store the consumer queue name
consumer.setQueuename(queueName);
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
@@ -2455,11 +2480,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
- }
- catch (JMSException e) // thrown by getMessageSelector
- {
- throw new AMQException(null, e.getMessage(), e);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
}
catch (FailoverException e)
{
@@ -2531,8 +2552,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
for (C consumer : consumers)
{
- consumer.failedOver();
+ consumer.failedOverPre();
registerConsumer(consumer, true);
+ consumer.failedOverPost();
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index a8487b04e9..34457d745f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -135,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
try
{
- flushAcknowledgments();
+ flushAcknowledgments(true);
}
catch (Throwable t)
{
@@ -236,12 +236,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
void flushAcknowledgments()
{
+ flushAcknowledgments(false);
+ }
+
+ void flushAcknowledgments(boolean setSyncBit)
+ {
synchronized (unacked)
{
if (unackedCount > 0)
{
messageAcknowledge
- (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
clearUnacked();
}
}
@@ -249,6 +254,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
void messageAcknowledge(RangeSet ranges, boolean accept)
{
+ messageAcknowledge(ranges,accept,false);
+ }
+
+ void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ {
Session ssn = getQpidSession();
for (Range range : ranges)
{
@@ -257,7 +267,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
ssn.flushProcessed(accept ? BATCH : NONE);
if (accept)
{
- ssn.messageAccept(ranges, UNRELIABLE);
+ ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
}
}
@@ -272,7 +282,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param arguments 0_8 specific
*/
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
- final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
+ final FieldTable arguments, final AMQShortString exchangeName,
+ final AMQDestination destination, final boolean nowait)
throws AMQException, FailoverException
{
Map args = FiledTableSupport.convertToMap(arguments);
@@ -287,9 +298,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
}
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
}
@@ -501,18 +515,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
- getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+ Option.UNRELIABLE);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
- if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
+ if(prefetch() && (isStarted() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch());
+ getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+
+ if (!nowait)
+ {
+ getQpidSession().sync();
+ getCurrentException();
}
- getQpidSession().sync();
- getCurrentException();
}
/**
@@ -540,14 +560,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
null,
name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ getQpidSession().sync();
+ getCurrentException();
+ }
}
/**
* Declare a queue with the given queueName
*/
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait)
throws AMQException, FailoverException
{
// do nothing this is only used by 0_8
@@ -557,7 +581,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal, final boolean nowait)
throws AMQException, FailoverException
{
AMQShortString res;
@@ -581,9 +605,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.isDurable() ? Option.DURABLE : Option.NONE,
!amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
return res;
}
@@ -609,7 +636,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
}
}
else
@@ -625,17 +653,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (consumer.getMessageListener() != null)
{
getQpidSession().messageFlow(consumerTag,
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
else
{
getQpidSession()
.messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch());
+ getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
}
getQpidSession()
- .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+ Option.UNRELIABLE);
}
catch (Exception e)
{
@@ -700,6 +731,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void opened(Session ssn) {}
+ public void resumed(Session ssn)
+ {
+ _qpidConnection = ssn.getConnection();
+ try
+ {
+ resubscribe();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public void message(Session ssn, MessageTransfer xfr)
{
messageReceived(new UnprocessedMessage_0_10(xfr));
@@ -716,7 +760,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void closed(Session ssn) {}
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal, final boolean nowait)
throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -736,34 +780,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal);
+ return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
}
}, _connection).execute();
}
-
- void start() throws AMQException
- {
- super.start();
- for(BasicMessageConsumer c: _consumers.values())
- {
- c.start();
- }
- }
-
-
- void stop() throws AMQException
- {
- super.stop();
- for(BasicMessageConsumer c: _consumers.values())
- {
- c.stop();
- }
- }
-
-
-
-
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 6451ae60be..ff8631c12e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -106,7 +106,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
+ final AMQShortString exchangeName, final AMQDestination dest,
+ final boolean nowait) throws AMQException, FailoverException
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
@@ -300,13 +301,14 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
{
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
- false,false,false,nowait,null);
+ false,false,false,false,null);
AMQFrame exchangeDeclare = body.generateFrame(_channelId);
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException
{
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 76422c6297..2bb443a090 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -441,7 +441,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
o = _synchronousQueue.take();
}
return o;
- }
+ }
+
+ abstract Message receiveBrowse() throws JMSException;
public Message receiveNoWait() throws JMSException
{
@@ -1037,23 +1039,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_synchronousQueue.clear();
}
- public void start()
- {
- // do nothing as this is a 0_10 feature
- }
-
-
- public void stop()
- {
- // do nothing as this is a 0_10 feature
- }
-
- public boolean isStrated()
- {
- // do nothing as this is a 0_10 feature
- return false;
- }
-
public AMQShortString getQueuename()
{
return _queuename;
@@ -1070,10 +1055,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
/** to be called when a failover has occured */
- public void failedOver()
+ public void failedOverPre()
{
clearReceiveQueue();
// TGM FIXME: think this should just be removed
// clearUnackedMessages();
}
+
+ public void failedOverPost() {}
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 7d535643c0..8b17dcf91f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -31,6 +31,7 @@ import org.apache.qpid.filter.JMSSelectorFilter;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,7 +149,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (isMessageListenerSet() && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
@@ -246,7 +248,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if(! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
// now we need to acquire this message if needed
@@ -258,9 +261,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_logger.debug("filterMessage - trying to acquire message");
}
messageOk = acquireMessage(message);
- _logger.debug("filterMessage - *************************************");
_logger.debug("filterMessage - message acquire status : " + messageOk);
- _logger.debug("filterMessage - *************************************");
}
return messageOk;
}
@@ -335,7 +336,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (messageListener != null && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -349,26 +351,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- public boolean isStrated()
+ public void failedOverPost()
{
- return _isStarted;
- }
-
- public void start()
- {
- _isStarted = true;
- if (_syncReceive.get())
+ if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
- public void stop()
- {
- _isStarted = false;
- }
-
/**
* When messages are not prefetched we need to request a message from the
* broker.
@@ -380,16 +372,35 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
- }
if (! getSession().prefetch())
{
_syncReceive.set(true);
}
+ if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
Object o = super.getMessageFromQueue(l);
+ if (o == null && _0_10session.isStarted())
+ {
+ _0_10session.getQpidSession().messageFlush
+ (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
+ _0_10session.getQpidSession().sync();
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.BYTE,
+ 0xFFFFFFFF, Option.UNRELIABLE);
+ if (getSession().prefetch())
+ {
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE,
+ _0_10session.getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+ _0_10session.syncDispatchQueue();
+ o = super.getMessageFromQueue(-1);
+ }
if (! getSession().prefetch())
{
_syncReceive.set(false);
@@ -404,6 +415,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
+
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
+ !_session.isInRecovery() &&
+ _session.getAMQConnection().getSyncAck())
+ {
+ ((AMQSession_0_10) getSession()).flushAcknowledgments();
+ ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ }
+ }
+
+ Message receiveBrowse() throws JMSException
+ {
+ return receiveNoWait();
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 494a8fb43d..308f04f082 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.Message;
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
@@ -38,9 +39,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
protected final Logger _logger = LoggerFactory.getLogger(getClass());
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
@@ -73,13 +74,18 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
- {
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
+ {
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
+ }
+ Message receiveBrowse() throws JMSException
+ {
+ return receive();
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 954a3bc28f..5ff6066ddc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
+ enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
+
protected final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQConnection _connection;
@@ -120,6 +122,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
protected String _userID; // ref user id used in the connection.
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+
+ protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
@@ -141,6 +145,26 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
_userID = connection.getUsername();
+ setPublishMode();
+ }
+
+ void setPublishMode()
+ {
+ // Publish mode could be configured at destination level as well.
+ // Will add support for this when we provide a more robust binding URL
+
+ String syncPub = _connection.getSyncPublish();
+ // Support for deprecated option sync_persistence
+ if (syncPub.equals("persistent") || _connection.getSyncPersistence())
+ {
+ publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
+ }
+ else if (syncPub.equals("all"))
+ {
+ publishMode = PublishMode.SYNC_PUBLISH_ALL;
+ }
+
+ _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
}
void resubscribe() throws AMQException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 4e5077f0cd..b8c5fc8faf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -151,9 +151,13 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
((AMQSession_0_10) getSession()).getQpidSession();
// if true, we need to sync the delivery of this message
- boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
- getSession().getAMQConnection().getSyncPersistence());
+ boolean sync = false;
+ sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
+ (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT &&
+ deliveryMode == DeliveryMode.PERSISTENT)
+ );
+
org.apache.mina.common.ByteBuffer data = message.getData();
ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index 986154cda8..3627618e68 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -47,6 +47,19 @@ public class ClientProperties
*/
public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
+ /**
+ * When true a sync command is sent after sending a message ack.
+ * type: boolean
+ */
+ public static final String SYNC_ACK_PROP_NAME = "sync_ack";
+
+ /**
+ * sync_publish property - {persistent|all}
+ * If set to 'persistent',then persistent messages will be publish synchronously
+ * If set to 'all', then all messages regardless of the delivery mode will be
+ * published synchronously.
+ */
+ public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
/**
* This value will be used in the following settings
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 39b9597af1..56e9a5dc73 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -25,8 +25,6 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
@@ -35,8 +33,6 @@ import javax.jms.ObjectMessage;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
@@ -157,7 +153,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
finally
{
- _data.rewind();
+ // _data.rewind();
close(in);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
index fab95f754c..f3f74dd332 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
@@ -58,22 +58,31 @@ public class URLParser
if ((connection.getHost() == null) || connection.getHost().equals(""))
{
- String uid = AMQConnectionFactory.getUniqueClientID();
- if (uid == null)
- {
- throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ String tmp = connection.getAuthority();
+ // hack to read a clientid such as "my_clientID"
+ if (tmp != null && tmp.indexOf('@') < tmp.length()-1)
+ {
+ _url.setClientName(tmp.substring(tmp.indexOf('@')+1,tmp.length()));
}
else
{
- _url.setClientName(uid);
+ String uid = AMQConnectionFactory.getUniqueClientID();
+ if (uid == null)
+ {
+ throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ }
+ else
+ {
+ _url.setClientName(uid);
+ }
}
- }
+ }
else
{
_url.setClientName(connection.getHost());
}
-
+
String userInfo = connection.getUserInfo();
if (userInfo == null)