diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 179 |
1 files changed, 68 insertions, 111 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 86e1fc08de..7e5edef38d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; @@ -74,6 +75,7 @@ import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.SessionListener; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; +import org.apache.qpid.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -294,23 +296,34 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - void messageAcknowledge(RangeSet ranges, boolean accept) + void messageAcknowledge(final RangeSet ranges, final boolean accept) { messageAcknowledge(ranges,accept,false); } - void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit) + void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit) { - Session ssn = getQpidSession(); - for (Range range : ranges) + final Session ssn = getQpidSession(); + flushProcessed(ranges,accept); + if (accept) { - ssn.processed(range); + ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE); } - ssn.flushProcessed(accept ? BATCH : NONE); - if (accept) + } + + /** + * Flush any outstanding commands. This causes session complete to be sent. + * @param ranges the range of command ids. + * @param batch true if batched. + */ + void flushProcessed(final RangeSet ranges, final boolean batch) + { + final Session ssn = getQpidSession(); + for (final Range range : ranges) { - ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE); + ssn.processed(range); } + ssn.flushProcessed(batch ? BATCH : NONE); } /** @@ -364,7 +377,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _logger.debug("Binding queue : " + queue + " exchange: " + exchange + " using binding key " + binding.getBindingKey() + - " with args " + printMap(binding.getArgs())); + " with args " + Strings.printMap(binding.getArgs())); getQpidSession().exchangeBind(queue, exchange, binding.getBindingKey(), @@ -496,13 +509,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, - final FieldTable ft, final boolean noConsume, + final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal, - _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh, + _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); } @@ -568,56 +581,30 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Registers the consumer with the broker */ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector, int tag) + boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException { - boolean preAcquire; - - long capacity = getCapacity(consumer.getDestination()); - - try - { - boolean isTopic; - Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); - - if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) - { - isTopic = consumer.getDestination() instanceof AMQTopic || - consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ; - - preAcquire = isTopic || (!consumer.isNoConsume() && - (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals(""))); - } - else - { - isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE; - - preAcquire = !consumer.isNoConsume() && - (isTopic || consumer.getMessageSelector() == null || - consumer.getMessageSelector().equals("")); - - arguments.putAll( - (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); - } - - boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; - - if (consumer.getDestination().getLink() != null) - { - acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE; - } - - getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), - acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, - preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - } - catch (JMSException e) + boolean preAcquire = consumer.isPreAcquire(); + + AMQDestination destination = consumer.getDestination(); + long capacity = consumer.getCapacity(); + + Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + + Link link = destination.getLink(); + if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) { - throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); + arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs()); } + boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; + + getQpidSession().messageSubscribe + (queueName.toString(), String.valueOf(tag), + acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); if (capacity == 0) @@ -646,21 +633,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private long getCapacity(AMQDestination destination) - { - long capacity = 0; - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (prefetch()) - { - capacity = getAMQConnection().getMaxPrefetch(); - } - return capacity; - } - /** * Create an 0_10 message producer */ @@ -825,7 +797,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic //only set if msg list is null try { - long capacity = getCapacity(consumer.getDestination()); + long capacity = consumer.getCapacity(); if (capacity == 0) { @@ -969,17 +941,23 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Store non committed messages for this session - * With 0.10 messages are consumed with window mode, we must send a completion - * before the window size is reached so credits don't dry up. * @param id */ @Override protected void addDeliveredMessage(long id) { _txRangeSet.add((int) id); _txSize++; + } + + /** + * With 0.10 messages are consumed with window mode, we must send a completion + * before the window size is reached so credits don't dry up. + */ + protected void sendTxCompletionsIfNecessary() + { // this is a heuristic, we may want to have that configurable - if (_connection.getMaxPrefetch() == 1 || - _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0) + if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 || + _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)) { // send completed so consumer credits don't dry up messageAcknowledge(_txRangeSet, false); @@ -1168,8 +1146,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic boolean isConsumer, boolean noWait) throws AMQException { - if (dest.isAddressResolved()) - { + if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime())) + { if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) { createSubscriptionQueue(dest); @@ -1189,22 +1167,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic int type = resolveAddressType(dest); - if (type == AMQDestination.QUEUE_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.AT_LEAST_ONCE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.UNRELIABLE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE) - { - throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics"); - } - switch (type) { case AMQDestination.QUEUE_TYPE: @@ -1258,7 +1220,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic "The name '" + dest.getAddressName() + "' supplied in the address doesn't resolve to an exchange or a queue"); } - dest.setAddressResolved(true); + dest.setAddressResolved(System.currentTimeMillis()); } } @@ -1352,22 +1314,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setRoutingKey(new AMQShortString(dest.getSubject())); } - /** This should be moved to a suitable utility class */ - private String printMap(Map<String,Object> map) - { - StringBuilder sb = new StringBuilder(); - sb.append("<"); - if (map != null) - { - for(String key : map.keySet()) - { - sb.append(key).append(" = ").append(map.get(key)).append(" "); - } - } - sb.append(">"); - return sb.toString(); - } - protected void acknowledgeImpl() { RangeSet range = gatherUnackedRangeSet(); @@ -1378,4 +1324,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().sync(); } } + + @Override + void resubscribe() throws AMQException + { + // Also reset the delivery tag tracker, to insure we dont + // return the first <total number of msgs received on session> + // messages sent by the brokers following the first rollback + // after failover + _highestDeliveryTag.set(-1); + super.resubscribe(); + } } |