summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java166
1 files changed, 110 insertions, 56 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 8395c8f4b7..3902c726f3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -17,11 +17,6 @@
*/
package org.apache.qpid.client;
-import static org.apache.qpid.transport.Option.BATCH;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
-
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
@@ -34,10 +29,8 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
-
import javax.jms.Destination;
import javax.jms.JMSException;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -55,11 +48,14 @@ import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
@@ -78,6 +74,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
private static Timer timer = new Timer("ack-flusher", true);
+
private static class Flusher extends TimerTask
{
@@ -120,7 +117,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private AMQException _currentException;
// a ref on the qpid connection
- protected org.apache.qpid.transport.Connection _qpidConnection;
+ private org.apache.qpid.transport.Connection _qpidConnection;
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private TimerTask flushTask = null;
@@ -163,7 +160,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_qpidSession = _qpidConnection.createSession(name,1);
}
_qpidSession.setSessionListener(this);
- if (_transacted)
+ if (isTransacted())
{
_qpidSession.txSelect();
_qpidSession.setTransacted(true);
@@ -214,6 +211,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ protected Connection getQpidConnection()
+ {
+ return _qpidConnection;
+ }
+
//------- overwritten methods of class AMQSession
void failoverPrep()
@@ -234,17 +236,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + getChannelId());
}
// acknowledge this message
if (multiple)
{
- for (Long messageTag : _unacknowledgedMessageTags)
+ for (Long messageTag : getUnacknowledgedMessageTags())
{
if( messageTag <= deliveryTag )
{
addUnacked(messageTag.intValue());
- _unacknowledgedMessageTags.remove(messageTag);
+ getUnacknowledgedMessageTags().remove(messageTag);
}
}
//empty the list of unack messages
@@ -253,12 +255,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
else
{
addUnacked((int) deliveryTag);
- _unacknowledgedMessageTags.remove(deliveryTag);
+ getUnacknowledgedMessageTags().remove(deliveryTag);
}
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
+ if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || getAcknowledgeMode() == javax.jms.Session.AUTO_ACKNOWLEDGE)
{
flushAcknowledgments();
}
@@ -276,7 +278,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (unackedCount > 0)
{
messageAcknowledge
- (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
+ (unacked, getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
clearUnacked();
}
}
@@ -444,8 +446,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
// release all unacked messages
RangeSet all = RangeSetFactory.createRangeSet();
- RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
- RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
+ RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
{
Range range = deliveredIter.next();
@@ -526,9 +528,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh,
- prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
+ return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
+ getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh,
+ prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
/**
@@ -593,7 +595,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Registers the consumer with the broker
*/
public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, MessageFilter messageSelector, int tag)
+ boolean nowait, int tag)
throws AMQException, FailoverException
{
boolean preAcquire = consumer.isPreAcquire();
@@ -630,7 +632,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
- if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
+ if(capacity > 0 && getDispatcher() != null && (isStarted() || isImmediatePrefetch()))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
@@ -648,12 +650,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Create an 0_10 message producer
*/
- public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final long producerId) throws JMSException
+ public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, final long producerId) throws JMSException
{
try
{
- return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
+ return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this,
getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
@@ -719,7 +721,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait)
+ final boolean nowait, boolean passive)
throws AMQException, FailoverException
{
// do nothing this is only used by 0_8
@@ -729,7 +731,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait)
+ final boolean noLocal, final boolean nowait, boolean passive)
throws AMQException
{
AMQShortString queueName;
@@ -755,13 +757,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
- amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE,
+ passive ? Option.PASSIVE : Option.NONE);
}
else
{
QueueNode node = (QueueNode)amqd.getSourceNode();
+ Map<String,Object> arguments = new HashMap<String,Object>();
+ arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs());
+ if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null)
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
- node.getDeclareArgs(),
+ arguments,
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
@@ -795,15 +804,16 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (suspend)
{
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer consumer : getConsumers().values())
{
getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
Option.UNRELIABLE);
}
+ sync();
}
else
{
- for (BasicMessageConsumer_0_10 consumer : _consumers.values())
+ for (BasicMessageConsumer_0_10 consumer : getConsumers().values())
{
String consumerTag = String.valueOf(consumer.getConsumerTag());
//only set if msg list is null
@@ -918,11 +928,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return getCurrentException();
}
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait)
+ protected AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -939,14 +950,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
+ return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
}
- }, _connection).execute();
+ }, getAMQConnection()).execute();
}
- protected Long requestQueueDepth(AMQDestination amqd)
+ protected Long requestQueueDepth(AMQDestination amqd, boolean sync)
{
flushAcknowledgments();
+ if (sync)
+ {
+ getQpidSession().sync();
+ }
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
@@ -968,8 +983,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected void sendTxCompletionsIfNecessary()
{
// this is a heuristic, we may want to have that configurable
- if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 ||
- _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0))
+ if (_txSize > 0 && (getAMQConnection().getMaxPrefetch() == 1 ||
+ getAMQConnection().getMaxPrefetch() != 0 && _txSize % (getAMQConnection().getMaxPrefetch() / 2) == 0))
{
// send completed so consumer credits don't dry up
messageAcknowledge(_txRangeSet, false);
@@ -1039,7 +1054,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
_currentException = amqe;
}
- _connection.exceptionReceived(_currentException);
+ getAMQConnection().exceptionReceived(_currentException);
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
@@ -1156,13 +1171,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
@SuppressWarnings("deprecation")
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
+ boolean noLocal,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
{
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest,noLocal);
}
}
else
@@ -1191,7 +1207,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
else if(createNode)
{
setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,false,noWait);
+ send0_10QueueDeclare(dest,null,noLocal,noWait, false);
sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
null,dest.getExchangeName(),dest, false);
break;
@@ -1206,7 +1222,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
verifySubject(dest);
if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest, noLocal);
}
break;
}
@@ -1221,7 +1237,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
false);
if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
{
- createSubscriptionQueue(dest);
+ createSubscriptionQueue(dest,noLocal);
}
break;
}
@@ -1284,7 +1300,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void createSubscriptionQueue(AMQDestination dest) throws AMQException
+ private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
{
QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
@@ -1297,11 +1313,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
node.setExclusive(true);
node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,false,true);
- node.addBinding(new Binding(dest.getAddressName(),
- dest.getQueueName(),// should have one by now
- dest.getSubject(),
- Collections.<String,Object>emptyMap()));
+ send0_10QueueDeclare(dest,null,noLocal,true, false);
+ getQpidSession().exchangeBind(dest.getQueueName(),
+ dest.getAddressName(),
+ dest.getSubject(),
+ Collections.<String,Object>emptyMap());
sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
null,dest.getExchangeName(),dest, false);
}
@@ -1328,7 +1344,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected void acknowledgeImpl()
{
- RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags);
+ RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
if(ranges.size() > 0 )
{
@@ -1344,15 +1360,53 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// return the first <total number of msgs received on session>
// messages sent by the brokers following the first rollback
// after failover
- _highestDeliveryTag.set(-1);
+ getHighestDeliveryTag().set(-1);
// Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
//messages that came from the old broker.
_txRangeSet.clear();
_txSize = 0;
- _unacknowledgedMessageTags.clear();
- _prefetchedMessageTags.clear();
+ getUnacknowledgedMessageTags().clear();
+ getPrefetchedMessageTags().clear();
super.resubscribe();
getQpidSession().sync();
}
+
+ @Override
+ void stop() throws AMQException
+ {
+ super.stop();
+ setUsingDispatcherForCleanup(true);
+ drainDispatchQueue();
+ setUsingDispatcherForCleanup(false);
+
+ for (BasicMessageConsumer consumer : getConsumers().values())
+ {
+ List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+ getPrefetchedMessageTags().addAll(tags);
+ }
+
+ RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
+ RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
+ RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+ + prefetched.size());
+
+ for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+ {
+ Range range = deliveredIter.next();
+ all.add(range);
+ }
+
+ for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+ {
+ Range range = prefetchedIter.next();
+ all.add(range);
+ }
+
+ flushProcessed(all, false);
+ getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+ getQpidSession().messageRelease(prefetched);
+ sync();
+ }
+
}