summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java39
1 files changed, 21 insertions, 18 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 32a0fdd5f5..aa0ff66545 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -52,7 +52,7 @@ import java.util.Map;
/**
* This is a 0.10 Session
*/
-public class AMQSession_0_10 extends AMQSession
+public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
{
/**
@@ -345,7 +345,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* Create an 0_10 message consumer
*/
- public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
final FieldTable ft, final boolean noConsume,
@@ -406,8 +406,8 @@ public class AMQSession_0_10 extends AMQSession
* This method is invoked when a consumer is creted
* Registers the consumer with the broker
*/
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector, AMQShortString tag)
+ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+ boolean nowait, String messageSelector, int tag)
throws AMQException, FailoverException
{
boolean preAcquire;
@@ -416,10 +416,10 @@ public class AMQSession_0_10 extends AMQSession
preAcquire = ( ! consumer.isNoConsume() &&
(consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
|| !(consumer.getDestination() instanceof AMQQueue);
- getQpidSession().messageSubscribe(queueName.toString(), tag.toString(),
+ getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
- new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
+ (BasicMessageConsumer_0_10) consumer, null,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
@@ -427,21 +427,23 @@ public class AMQSession_0_10 extends AMQSession
throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
}
+ String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+
if (! prefetch())
{
- getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT);
+ getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
}
else
{
- getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW);
+ getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
{
// set the flow
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
getAMQConnection().getMaxPrefetch());
}
@@ -452,7 +454,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* Create an 0_10 message producer
*/
- public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent,
long producerId)
{
@@ -542,13 +544,14 @@ public class AMQSession_0_10 extends AMQSession
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageStop(consumer.getConsumerTag().toString());
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
}
}
else
{
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer_0_10 consumer : _consumers.values())
{
+ String consumerTag = String.valueOf(consumer.getConsumerTag());
//only set if msg list is null
try
{
@@ -556,18 +559,18 @@ public class AMQSession_0_10 extends AMQSession
{
if (consumer.getMessageListener() != null)
{
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE, 1);
}
}
else
{
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE,
+ .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
getAMQConnection().getMaxPrefetch());
}
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
}
catch (Exception e)
{
@@ -715,7 +718,7 @@ public class AMQSession_0_10 extends AMQSession
AMQTopic origTopic=checkValidTopic(topic);
AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+ TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
if (subscriber != null)
{
if (subscriber.getTopic().equals(topic))
@@ -766,7 +769,7 @@ public class AMQSession_0_10 extends AMQSession
}
}
- subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+ subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);