diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 39 |
1 files changed, 21 insertions, 18 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 32a0fdd5f5..aa0ff66545 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -52,7 +52,7 @@ import java.util.Map; /** * This is a 0.10 Session */ -public class AMQSession_0_10 extends AMQSession +public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10> { /** @@ -345,7 +345,7 @@ public class AMQSession_0_10 extends AMQSession /** * Create an 0_10 message consumer */ - public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft, final boolean noConsume, @@ -406,8 +406,8 @@ public class AMQSession_0_10 extends AMQSession * This method is invoked when a consumer is creted * Registers the consumer with the broker */ - public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector, AMQShortString tag) + public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, + boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException { boolean preAcquire; @@ -416,10 +416,10 @@ public class AMQSession_0_10 extends AMQSession preAcquire = ( ! consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) ) || !(consumer.getDestination() instanceof AMQQueue); - getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), + getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED, preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, - new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, + (BasicMessageConsumer_0_10) consumer, null, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) @@ -427,21 +427,23 @@ public class AMQSession_0_10 extends AMQSession throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); } + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); + if (! prefetch()) { - getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT); + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT); } else { - getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW); + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) { // set the flow - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, getAMQConnection().getMaxPrefetch()); } @@ -452,7 +454,7 @@ public class AMQSession_0_10 extends AMQSession /** * Create an 0_10 message producer */ - public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId) { @@ -542,13 +544,14 @@ public class AMQSession_0_10 extends AMQSession { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageStop(consumer.getConsumerTag().toString()); + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag())); } } else { - for (BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer_0_10 consumer : _consumers.values()) { + String consumerTag = String.valueOf(consumer.getConsumerTag()); //only set if msg list is null try { @@ -556,18 +559,18 @@ public class AMQSession_0_10 extends AMQSession { if (consumer.getMessageListener() != null) { - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, 1); } } else { getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, + .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, getAMQConnection().getMaxPrefetch()); } getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); } catch (Exception e) { @@ -715,7 +718,7 @@ public class AMQSession_0_10 extends AMQSession AMQTopic origTopic=checkValidTopic(topic); AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber=_subscriptions.get(name); + TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name); if (subscriber != null) { if (subscriber.getTopic().equals(topic)) @@ -766,7 +769,7 @@ public class AMQSession_0_10 extends AMQSession } } - subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); |