summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java117
1 files changed, 69 insertions, 48 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index a8487b04e9..34457d745f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -135,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
try
{
- flushAcknowledgments();
+ flushAcknowledgments(true);
}
catch (Throwable t)
{
@@ -236,12 +236,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
void flushAcknowledgments()
{
+ flushAcknowledgments(false);
+ }
+
+ void flushAcknowledgments(boolean setSyncBit)
+ {
synchronized (unacked)
{
if (unackedCount > 0)
{
messageAcknowledge
- (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
clearUnacked();
}
}
@@ -249,6 +254,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
void messageAcknowledge(RangeSet ranges, boolean accept)
{
+ messageAcknowledge(ranges,accept,false);
+ }
+
+ void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ {
Session ssn = getQpidSession();
for (Range range : ranges)
{
@@ -257,7 +267,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
ssn.flushProcessed(accept ? BATCH : NONE);
if (accept)
{
- ssn.messageAccept(ranges, UNRELIABLE);
+ ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
}
}
@@ -272,7 +282,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param arguments 0_8 specific
*/
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
- final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
+ final FieldTable arguments, final AMQShortString exchangeName,
+ final AMQDestination destination, final boolean nowait)
throws AMQException, FailoverException
{
Map args = FiledTableSupport.convertToMap(arguments);
@@ -287,9 +298,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
}
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
}
@@ -501,18 +515,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
- getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+ Option.UNRELIABLE);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
- if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
+ if(prefetch() && (isStarted() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch());
+ getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+
+ if (!nowait)
+ {
+ getQpidSession().sync();
+ getCurrentException();
}
- getQpidSession().sync();
- getCurrentException();
}
/**
@@ -540,14 +560,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
null,
name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ getQpidSession().sync();
+ getCurrentException();
+ }
}
/**
* Declare a queue with the given queueName
*/
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait)
throws AMQException, FailoverException
{
// do nothing this is only used by 0_8
@@ -557,7 +581,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal, final boolean nowait)
throws AMQException, FailoverException
{
AMQShortString res;
@@ -581,9 +605,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.isDurable() ? Option.DURABLE : Option.NONE,
!amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
return res;
}
@@ -609,7 +636,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
}
}
else
@@ -625,17 +653,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (consumer.getMessageListener() != null)
{
getQpidSession().messageFlow(consumerTag,
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
else
{
getQpidSession()
.messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch());
+ getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
}
getQpidSession()
- .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+ Option.UNRELIABLE);
}
catch (Exception e)
{
@@ -700,6 +731,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void opened(Session ssn) {}
+ public void resumed(Session ssn)
+ {
+ _qpidConnection = ssn.getConnection();
+ try
+ {
+ resubscribe();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public void message(Session ssn, MessageTransfer xfr)
{
messageReceived(new UnprocessedMessage_0_10(xfr));
@@ -716,7 +760,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void closed(Session ssn) {}
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal, final boolean nowait)
throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -736,34 +780,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal);
+ return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
}
}, _connection).execute();
}
-
- void start() throws AMQException
- {
- super.start();
- for(BasicMessageConsumer c: _consumers.values())
- {
- c.start();
- }
- }
-
-
- void stop() throws AMQException
- {
- super.stop();
- for(BasicMessageConsumer c: _consumers.values())
- {
- c.stop();
- }
- }
-
-
-
-
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{