diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 149 |
1 files changed, 96 insertions, 53 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 4c3d768020..aa0ff66545 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,17 +27,18 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.util.Serial; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; -import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.transport.MessageCreditUnit; -import org.apache.qpidity.transport.MessageFlowMode; -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Option; -import org.apache.qpidity.transport.ExchangeBoundResult; -import org.apache.qpidity.transport.Future; +import org.apache.qpid.nclient.Session; +import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.ErrorCode; +import org.apache.qpid.QpidException; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,7 @@ import java.util.Map; /** * This is a 0.10 Session */ -public class AMQSession_0_10 extends AMQSession +public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10> { /** @@ -71,8 +72,11 @@ public class AMQSession_0_10 extends AMQSession private Object _currentExceptionLock = new Object(); private QpidException _currentException; - // a ref on the qpidity connection - protected org.apache.qpidity.nclient.Connection _qpidConnection; + // a ref on the qpid connection + protected org.apache.qpid.nclient.Connection _qpidConnection; + + private RangeSet unacked = new RangeSet(); + private int unackedCount = 0; /** * USed to store the range of in tx messages @@ -93,7 +97,7 @@ public class AMQSession_0_10 extends AMQSession * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. * @param qpidConnection The qpid connection */ - AMQSession_0_10(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId, + AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { @@ -123,7 +127,7 @@ public class AMQSession_0_10 extends AMQSession * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. * @param qpidConnection The connection */ - AMQSession_0_10(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId, + AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { @@ -131,6 +135,18 @@ public class AMQSession_0_10 extends AMQSession defaultPrefetchHigh, defaultPrefetchLow); } + private void addUnacked(int id) + { + unacked.add(id); + unackedCount++; + } + + private void clearUnacked() + { + unacked.clear(); + unackedCount = 0; + } + //------- overwritten methods of class AMQSession /** @@ -140,6 +156,7 @@ public class AMQSession_0_10 extends AMQSession * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the * delivery tag, <tt>false</tt> to just acknowledge that message. */ + public void acknowledgeMessage(long deliveryTag, boolean multiple) { if (_logger.isDebugEnabled()) @@ -147,14 +164,13 @@ public class AMQSession_0_10 extends AMQSession _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId); } // acknowledge this message - RangeSet ranges = new RangeSet(); if (multiple) { for (Long messageTag : _unacknowledgedMessageTags) { if( messageTag <= deliveryTag ) { - ranges.add((int) (long) messageTag); + addUnacked(messageTag.intValue()); _unacknowledgedMessageTags.remove(messageTag); } } @@ -163,10 +179,26 @@ public class AMQSession_0_10 extends AMQSession } else { - ranges.add((int) deliveryTag); + addUnacked((int) deliveryTag); _unacknowledgedMessageTags.remove(deliveryTag); } - getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + + long prefetch = getAMQConnection().getMaxPrefetch(); + + if (unackedCount >= prefetch/2) + { + flushAcknowledgments(); + } + } + + void flushAcknowledgments() + { + if (unackedCount > 0) + { + getQpidSession().messageAcknowledge + (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + clearUnacked(); + } } /** @@ -210,6 +242,7 @@ public class AMQSession_0_10 extends AMQSession */ public void sendClose(long timeout) throws AMQException, FailoverException { + flushAcknowledgments(); getQpidSession().sync(); getQpidSession().close(); getCurrentException(); @@ -243,15 +276,16 @@ public class AMQSession_0_10 extends AMQSession * @param durable If set when creating a new queue, * the queue will be marked as durable. * @param exclusive Exclusive queues can only be used from one connection at a time. + * @param arguments Exclusive queues can only be used from one connection at a time. * @throws AMQException * @throws FailoverException */ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive) throws AMQException, FailoverException + final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException { - getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION, - autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION, - exclusive ? Option.EXCLUSIVE : Option.NO_OPTION); + getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE, + autoDelete ? Option.AUTO_DELETE : Option.NONE, + exclusive ? Option.EXCLUSIVE : Option.NONE); // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -311,7 +345,7 @@ public class AMQSession_0_10 extends AMQSession /** * Create an 0_10 message consumer */ - public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft, final boolean noConsume, @@ -372,8 +406,8 @@ public class AMQSession_0_10 extends AMQSession * This method is invoked when a consumer is creted * Registers the consumer with the broker */ - public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector, AMQShortString tag) + public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, + boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException { boolean preAcquire; @@ -382,32 +416,34 @@ public class AMQSession_0_10 extends AMQSession preAcquire = ( ! consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) ) || !(consumer.getDestination() instanceof AMQQueue); - getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), + getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED, preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, - new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + (BasicMessageConsumer_0_10) consumer, null, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) { throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); } + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); + if (! prefetch()) { - getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT); + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT); } else { - getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW); + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) { // set the flow - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, getAMQConnection().getMaxPrefetch()); } @@ -418,7 +454,7 @@ public class AMQSession_0_10 extends AMQSession /** * Create an 0_10 message producer */ - public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId) { @@ -476,9 +512,9 @@ public class AMQSession_0_10 extends AMQSession arguments.put("no-local", true); } getQpidSession().queueDeclare(res.toString(), null, arguments, - amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION, - amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION, - !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + amqd.isDurable() ? Option.DURABLE : Option.NONE, + !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false // We need to sync so that we get notify of an error. getQpidSession().sync(); @@ -508,13 +544,14 @@ public class AMQSession_0_10 extends AMQSession { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageStop(consumer.getConsumerTag().toString()); + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag())); } } else { - for (BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer_0_10 consumer : _consumers.values()) { + String consumerTag = String.valueOf(consumer.getConsumerTag()); //only set if msg list is null try { @@ -522,18 +559,18 @@ public class AMQSession_0_10 extends AMQSession { if (consumer.getMessageListener() != null) { - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, 1); } } else { getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, + .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, getAMQConnection().getMaxPrefetch()); } getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); } catch (Exception e) { @@ -561,7 +598,7 @@ public class AMQSession_0_10 extends AMQSession * * @return The associated Qpid Session. */ - protected org.apache.qpidity.nclient.Session getQpidSession() + protected org.apache.qpid.nclient.Session getQpidSession() { return _qpidSession; } @@ -594,7 +631,7 @@ public class AMQSession_0_10 extends AMQSession try { // this is done so that we can produce to a temporary queue beofre we create a consumer - sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); + sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive(),null); sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result); result.setQueueName(result.getRoutingKey()); } @@ -612,7 +649,7 @@ public class AMQSession_0_10 extends AMQSession /** * Lstener for qpid protocol exceptions */ - private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener + private class QpidSessionExceptionListener implements org.apache.qpid.nclient.ClosedListener { public void onClosed(ErrorCode errorCode, String reason, Throwable t) { @@ -681,7 +718,7 @@ public class AMQSession_0_10 extends AMQSession AMQTopic origTopic=checkValidTopic(topic); AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber=_subscriptions.get(name); + TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name); if (subscriber != null) { if (subscriber.getTopic().equals(topic)) @@ -732,7 +769,7 @@ public class AMQSession_0_10 extends AMQSession } } - subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); @@ -740,7 +777,7 @@ public class AMQSession_0_10 extends AMQSession return subscriber; } - Long requestQueueDepth(AMQDestination amqd) + protected Long requestQueueDepth(AMQDestination amqd) { return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); } @@ -757,10 +794,11 @@ public class AMQSession_0_10 extends AMQSession _txRangeSet.add((int) id); _txSize++; // this is a heuristic, we may want to have that configurable - if( _txSize > _connection.getMaxPrefetch() / 2 ) + if (_connection.getMaxPrefetch() == 1 || + _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0) { - // send completed so consumer credits don't dry up - getQpidSession().messageAcknowledge(_txRangeSet, false); + // send completed so consumer credits don't dry up + getQpidSession().messageAcknowledge(_txRangeSet, false); } } @@ -787,14 +825,19 @@ public class AMQSession_0_10 extends AMQSession } } - final boolean tagLE(long tag1, long tag2) + protected final boolean tagLE(long tag1, long tag2) { return Serial.le((int) tag1, (int) tag2); } - final boolean updateRollbackMark(long currentMark, long deliveryTag) + protected final boolean updateRollbackMark(long currentMark, long deliveryTag) { return Serial.lt((int) currentMark, (int) deliveryTag); } + public AMQMessageDelegateFactory getMessageDelegateFactory() + { + return AMQMessageDelegateFactory.FACTORY_0_10; + } + } |