diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 117 |
1 files changed, 69 insertions, 48 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a8487b04e9..34457d745f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -135,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { try { - flushAcknowledgments(); + flushAcknowledgments(true); } catch (Throwable t) { @@ -236,12 +236,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic void flushAcknowledgments() { + flushAcknowledgments(false); + } + + void flushAcknowledgments(boolean setSyncBit) + { synchronized (unacked) { if (unackedCount > 0) { messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); clearUnacked(); } } @@ -249,6 +254,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic void messageAcknowledge(RangeSet ranges, boolean accept) { + messageAcknowledge(ranges,accept,false); + } + + void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit) + { Session ssn = getQpidSession(); for (Range range : ranges) { @@ -257,7 +267,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic ssn.flushProcessed(accept ? BATCH : NONE); if (accept) { - ssn.messageAccept(ranges, UNRELIABLE); + ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE); } } @@ -272,7 +282,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param arguments 0_8 specific */ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) + final FieldTable arguments, final AMQShortString exchangeName, + final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { Map args = FiledTableSupport.convertToMap(arguments); @@ -287,9 +298,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); } - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } } @@ -501,18 +515,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); // We need to sync so that we get notify of an error. // only if not immediat prefetch - if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) + if(prefetch() && (isStarted() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch()); + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); } - getQpidSession().sync(); - getCurrentException(); } /** @@ -540,14 +560,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic null, name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); + } } /** * Declare a queue with the given queueName */ - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException { // do nothing this is only used by 0_8 @@ -557,7 +581,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal, final boolean nowait) throws AMQException, FailoverException { AMQShortString res; @@ -581,9 +605,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.isDurable() ? Option.DURABLE : Option.NONE, !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } return res; } @@ -609,7 +636,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag())); + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); } } else @@ -625,17 +653,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (consumer.getMessageListener() != null) { getQpidSession().messageFlow(consumerTag, - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } else { getQpidSession() .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch()); + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); } getQpidSession() - .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); } catch (Exception e) { @@ -700,6 +731,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void opened(Session ssn) {} + public void resumed(Session ssn) + { + _qpidConnection = ssn.getConnection(); + try + { + resubscribe(); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + public void message(Session ssn, MessageTransfer xfr) { messageReceived(new UnprocessedMessage_0_10(xfr)); @@ -716,7 +760,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void closed(Session ssn) {} protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal, final boolean nowait) throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ @@ -736,34 +780,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); } }, _connection).execute(); } - - void start() throws AMQException - { - super.start(); - for(BasicMessageConsumer c: _consumers.values()) - { - c.start(); - } - } - - - void stop() throws AMQException - { - super.stop(); - for(BasicMessageConsumer c: _consumers.values()) - { - c.stop(); - } - } - - - - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { |