diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 166 |
1 files changed, 110 insertions, 56 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8395c8f4b7..3902c726f3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,11 +17,6 @@ */ package org.apache.qpid.client; -import static org.apache.qpid.transport.Option.BATCH; -import static org.apache.qpid.transport.Option.NONE; -import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; - import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; @@ -34,10 +29,8 @@ import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; - import javax.jms.Destination; import javax.jms.JMSException; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -55,11 +48,14 @@ import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; +import static org.apache.qpid.transport.Option.BATCH; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; import org.slf4j.Logger; @@ -78,6 +74,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); private static Timer timer = new Timer("ack-flusher", true); + private static class Flusher extends TimerTask { @@ -120,7 +117,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private AMQException _currentException; // a ref on the qpid connection - protected org.apache.qpid.transport.Connection _qpidConnection; + private org.apache.qpid.transport.Connection _qpidConnection; private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000); private TimerTask flushTask = null; @@ -163,7 +160,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _qpidSession = _qpidConnection.createSession(name,1); } _qpidSession.setSessionListener(this); - if (_transacted) + if (isTransacted()) { _qpidSession.txSelect(); _qpidSession.setTransacted(true); @@ -214,6 +211,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + protected Connection getQpidConnection() + { + return _qpidConnection; + } + //------- overwritten methods of class AMQSession void failoverPrep() @@ -234,17 +236,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (_logger.isDebugEnabled()) { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId); + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + getChannelId()); } // acknowledge this message if (multiple) { - for (Long messageTag : _unacknowledgedMessageTags) + for (Long messageTag : getUnacknowledgedMessageTags()) { if( messageTag <= deliveryTag ) { addUnacked(messageTag.intValue()); - _unacknowledgedMessageTags.remove(messageTag); + getUnacknowledgedMessageTags().remove(messageTag); } } //empty the list of unack messages @@ -253,12 +255,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else { addUnacked((int) deliveryTag); - _unacknowledgedMessageTags.remove(deliveryTag); + getUnacknowledgedMessageTags().remove(deliveryTag); } long prefetch = getAMQConnection().getMaxPrefetch(); - if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE) + if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || getAcknowledgeMode() == javax.jms.Session.AUTO_ACKNOWLEDGE) { flushAcknowledgments(); } @@ -276,7 +278,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (unackedCount > 0) { messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); + (unacked, getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); clearUnacked(); } } @@ -444,8 +446,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { // release all unacked messages RangeSet all = RangeSetFactory.createRangeSet(); - RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); - RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); + RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) { Range range = deliveredIter.next(); @@ -526,9 +528,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal, - _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh, - prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); + return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, + getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh, + prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose); } /** @@ -593,7 +595,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Registers the consumer with the broker */ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, MessageFilter messageSelector, int tag) + boolean nowait, int tag) throws AMQException, FailoverException { boolean preAcquire = consumer.isPreAcquire(); @@ -630,7 +632,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) + if(capacity > 0 && getDispatcher() != null && (isStarted() || isImmediatePrefetch())) { // set the flow getQpidSession().messageFlow(consumerTag, @@ -648,12 +650,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Create an 0_10 message producer */ - public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final long producerId) throws JMSException + public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, final long producerId) throws JMSException { try { - return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this, + return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, getProtocolHandler(), producerId, immediate, mandatory); } catch (AMQException e) @@ -719,7 +721,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) + final boolean nowait, boolean passive) throws AMQException, FailoverException { // do nothing this is only used by 0_8 @@ -729,7 +731,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait) + final boolean noLocal, final boolean nowait, boolean passive) throws AMQException { AMQShortString queueName; @@ -755,13 +757,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, - amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE, + passive ? Option.PASSIVE : Option.NONE); } else { QueueNode node = (QueueNode)amqd.getSourceNode(); + Map<String,Object> arguments = new HashMap<String,Object>(); + arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs()); + if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() , - node.getDeclareArgs(), + arguments, node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); @@ -795,15 +804,16 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - for (BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer consumer : getConsumers().values()) { getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), Option.UNRELIABLE); } + sync(); } else { - for (BasicMessageConsumer_0_10 consumer : _consumers.values()) + for (BasicMessageConsumer_0_10 consumer : getConsumers().values()) { String consumerTag = String.valueOf(consumer.getConsumerTag()); //only set if msg list is null @@ -918,11 +928,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return getCurrentException(); } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait) + protected AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -939,14 +950,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive); } - }, _connection).execute(); + }, getAMQConnection()).execute(); } - protected Long requestQueueDepth(AMQDestination amqd) + protected Long requestQueueDepth(AMQDestination amqd, boolean sync) { flushAcknowledgments(); + if (sync) + { + getQpidSession().sync(); + } return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); } @@ -968,8 +983,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic protected void sendTxCompletionsIfNecessary() { // this is a heuristic, we may want to have that configurable - if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 || - _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)) + if (_txSize > 0 && (getAMQConnection().getMaxPrefetch() == 1 || + getAMQConnection().getMaxPrefetch() != 0 && _txSize % (getAMQConnection().getMaxPrefetch() / 2) == 0)) { // send completed so consumer credits don't dry up messageAcknowledge(_txRangeSet, false); @@ -1039,7 +1054,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); _currentException = amqe; } - _connection.exceptionReceived(_currentException); + getAMQConnection().exceptionReceived(_currentException); } public AMQMessageDelegateFactory getMessageDelegateFactory() @@ -1156,13 +1171,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic @SuppressWarnings("deprecation") public void handleAddressBasedDestination(AMQDestination dest, boolean isConsumer, + boolean noLocal, boolean noWait) throws AMQException { - if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime())) + if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) { if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) { - createSubscriptionQueue(dest); + createSubscriptionQueue(dest,noLocal); } } else @@ -1191,7 +1207,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else if(createNode) { setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,false,noWait); + send0_10QueueDeclare(dest,null,noLocal,noWait, false); sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), null,dest.getExchangeName(),dest, false); break; @@ -1206,7 +1222,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic verifySubject(dest); if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) { - createSubscriptionQueue(dest); + createSubscriptionQueue(dest, noLocal); } break; } @@ -1221,7 +1237,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic false); if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) { - createSubscriptionQueue(dest); + createSubscriptionQueue(dest,noLocal); } break; } @@ -1284,7 +1300,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void createSubscriptionQueue(AMQDestination dest) throws AMQException + private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException { QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null @@ -1297,11 +1313,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } node.setExclusive(true); node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,false,true); - node.addBinding(new Binding(dest.getAddressName(), - dest.getQueueName(),// should have one by now - dest.getSubject(), - Collections.<String,Object>emptyMap())); + send0_10QueueDeclare(dest,null,noLocal,true, false); + getQpidSession().exchangeBind(dest.getQueueName(), + dest.getAddressName(), + dest.getSubject(), + Collections.<String,Object>emptyMap()); sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), null,dest.getExchangeName(),dest, false); } @@ -1328,7 +1344,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic protected void acknowledgeImpl() { - RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags()); if(ranges.size() > 0 ) { @@ -1344,15 +1360,53 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // return the first <total number of msgs received on session> // messages sent by the brokers following the first rollback // after failover - _highestDeliveryTag.set(-1); + getHighestDeliveryTag().set(-1); // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to //messages that came from the old broker. _txRangeSet.clear(); _txSize = 0; - _unacknowledgedMessageTags.clear(); - _prefetchedMessageTags.clear(); + getUnacknowledgedMessageTags().clear(); + getPrefetchedMessageTags().clear(); super.resubscribe(); getQpidSession().sync(); } + + @Override + void stop() throws AMQException + { + super.stop(); + setUsingDispatcherForCleanup(true); + drainDispatchQueue(); + setUsingDispatcherForCleanup(false); + + for (BasicMessageConsumer consumer : getConsumers().values()) + { + List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + getPrefetchedMessageTags().addAll(tags); + } + + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); + RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); + RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + + prefetched.size()); + + for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + + for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + + flushProcessed(all, false); + getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + sync(); + } + } |