summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java552
1 files changed, 271 insertions, 281 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 3b51fb8db0..5203a27f42 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -63,63 +63,54 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
-import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.client.message.JMSObjectMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.ReturnMessage;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td>
* </table>
*
* @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
- * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
- * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
- * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
- * has been reestablished. All fail-over protected operations should be placed in private methods, with
- * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
- * fail-over process sets a nowait flag and uses an async method call instead.
- *
+ * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
+ * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
+ * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
+ * has been reestablished. All fail-over protected operations should be placed in private methods, with
+ * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
+ * fail-over process sets a nowait flag and uses an async method call instead.
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
- * after looking at worse bottlenecks first.
+ * after looking at worse bottlenecks first.
*/
-public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
- public static final class IdToConsumerMap
+
+
+ public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
- private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
-
+ private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
- public BasicMessageConsumer get(int id)
+ public C get(int id)
{
- if((id & 0xFFFFFFF0) == 0)
+ if ((id & 0xFFFFFFF0) == 0)
{
- return _fastAccessConsumers[id];
+ return (C) _fastAccessConsumers[id];
}
else
{
@@ -127,12 +118,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+ public C put(int id, C consumer)
{
- BasicMessageConsumer oldVal;
- if((id & 0xFFFFFFF0) == 0)
+ C oldVal;
+ if ((id & 0xFFFFFFF0) == 0)
{
- oldVal = _fastAccessConsumers[id];
+ oldVal = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = consumer;
}
else
@@ -144,13 +135,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
-
- public BasicMessageConsumer remove(int id)
+ public C remove(int id)
{
- BasicMessageConsumer consumer;
- if((id & 0xFFFFFFF0) == 0)
+ C consumer;
+ if ((id & 0xFFFFFFF0) == 0)
{
- consumer = _fastAccessConsumers[id];
+ consumer = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = null;
}
else
@@ -162,15 +152,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- public Collection<BasicMessageConsumer> values()
+ public Collection<C> values()
{
- ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+ ArrayList<C> values = new ArrayList<C>();
- for(int i = 0; i < 16; i++)
+ for (int i = 0; i < 16; i++)
{
- if(_fastAccessConsumers[i] != null)
+ if (_fastAccessConsumers[i] != null)
{
- values.add(_fastAccessConsumers[i]);
+ values.add((C) _fastAccessConsumers[i]);
}
}
values.addAll(_slowAccessConsumers.values());
@@ -178,11 +168,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
return values;
}
-
public void clear()
{
_slowAccessConsumers.clear();
- for(int i = 0; i<16; i++)
+ for (int i = 0; i < 16; i++)
{
_fastAccessConsumers[i] = null;
}
@@ -192,8 +181,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
- /** Used for debugging in the dispatcher. */
- private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class);
/** The default maximum number of prefetched message at which to suspend the channel. */
public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
@@ -243,7 +230,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Holds this session unique identifier, used to distinguish it from other sessions. */
protected int _channelId;
- /** @todo This does not appear to be set? */
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
@@ -270,8 +256,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
+ new ConcurrentHashMap<C, String>();
/**
* Used to hold incoming messages.
@@ -280,19 +266,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*/
protected final FlowControllingBlockingQueue _queue;
- /**
- * Holds the highest received delivery tag.
- */
+ /** Holds the highest received delivery tag. */
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
- /**
- * All the not yet acknowledged message tags
- */
+ /** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
- /**
- * All the delivered message tags
- */
+ /** All the delivered message tags */
protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
/** Holds the dispatcher thread for this session. */
@@ -314,16 +294,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- protected final IdToConsumerMap _consumers = new IdToConsumerMap();
-
- //Map<AMQShortString, BasicMessageConsumer> _consumers =
- //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+ protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+
+ //Map<AMQShortString, BasicMessageConsumer> _consumers =
+ //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
*/
- private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+ private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -377,10 +357,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
private boolean _dirty;
- /** Has failover occured on this session */
- private boolean _failedOver;
-
-
+ /** Has failover occured on this session with outstanding actions to commit? */
+ private boolean _failedOverDirty;
private static final class FlowControlIndicator
{
@@ -388,7 +366,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public synchronized void setFlowControl(boolean flowControl)
{
- _flowControl= flowControl;
+ _flowControl = flowControl;
notify();
}
@@ -412,7 +390,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
@@ -445,21 +423,25 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
+ private final AtomicBoolean _suspendState = new AtomicBoolean();
+
public void aboveThreshold(int currentValue)
{
- _logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(true)).start();
+ _logger.debug(
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ _suspendState.set(true);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
public void underThreshold(int currentValue)
{
- _logger.debug(
- "Below threshold(" + _defaultPrefetchLowMark
- + ") so unsuspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(false)).start();
+ _logger.debug(
+ "Below threshold(" + _defaultPrefetchLowMark
+ + ") so unsuspending channel. Current value is " + currentValue);
+ _suspendState.set(false);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
});
@@ -499,10 +481,30 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
close(-1);
}
+ public void checkNotClosed() throws JMSException
+ {
+ try
+ {
+ super.checkNotClosed();
+ }
+ catch (IllegalStateException ise)
+ {
+ // if the Connection has closed then we should throw any exception that has occured that we were not waiting for
+ AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
+
+ if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
+ {
+ ise.setLinkedException(manager.getLastException());
+ }
+
+ throw ise;
+ }
+ }
+
public BytesMessage createBytesMessage() throws JMSException
{
checkNotClosed();
- return new JMSBytesMessage();
+ return new JMSBytesMessage(getMessageDelegateFactory());
}
/**
@@ -515,7 +517,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (isClosed())
{
throw new IllegalStateException("Session is already closed");
- }
+ }
else if (hasFailedOver())
{
throw new IllegalStateException("has failed over");
@@ -560,39 +562,35 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param exchangeName The exchange to bind the queue on.
*
* @throws AMQException If the queue cannot be bound for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
- *
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
+ final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
{
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
+ sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
return null;
}
}, _connection).execute();
}
-
- public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
+ public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException
{
- if( consumer.getQueuename() != null)
+ if (consumer.getQueuename() != null)
{
- bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
+ bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd);
}
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
+ final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
/**
-
* Closes the session.
*
* <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
@@ -602,14 +600,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
- *
* @todo Be aware of possible changes to parameter order as versions change.
- *
* @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
- * re-opened. May need to examine this more carefully.
- *
+ * re-opened. May need to examine this more carefully.
* @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
- * because the failover process sends the failover event before acquiring the mutex itself.
+ * because the failover process sends the failover event before acquiring the mutex itself.
*/
public void close(long timeout) throws JMSException
{
@@ -617,13 +612,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
_logger.info("Closing session: " + this); // + ":"
- // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
// Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
@@ -634,7 +629,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
try
{
- sendClose(timeout);
+ sendClose(timeout);
}
catch (AMQException e)
{
@@ -685,7 +680,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (!_closed.getAndSet(true))
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
synchronized (_messageDeliveryLock)
{
@@ -701,7 +696,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
amqe = new AMQException("Closing session forcibly", e);
}
-
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
}
@@ -719,12 +713,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
{
- checkTransacted();
+ checkTransacted();
try
{
@@ -743,6 +736,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
// Commits outstanding messages and acknowledgments
sendCommit();
+ markClean();
}
catch (AMQException e)
{
@@ -756,11 +750,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public abstract void sendCommit() throws AMQException, FailoverException;
- public void confirmConsumerCancelled(AMQShortString consumerTag)
+
+ public void confirmConsumerCancelled(int consumerTag)
{
// Remove the consumer from the map
- BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
+ C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
if (!consumer.isNoConsume()) // Normal Consumer
@@ -788,10 +783,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
// consumer.markClosed();
-
-
if (consumer.isAutoClose())
- {
+ {
// There is a small window where the message is between the two queues in the dispatcher.
if (consumer.isClosed())
{
@@ -802,6 +795,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
deregisterConsumer(consumer);
}
+ else
+ {
+ _queue.add(new CloseConsumerMessage(consumer));
+ }
}
}
}
@@ -847,7 +844,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false, false);
}
- public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ public C createExclusiveConsumer(Destination destination) throws JMSException
{
checkValidDestination(destination);
@@ -855,7 +852,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false, false);
}
-
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -873,7 +869,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
messageSelector, null, false, false);
}
-
public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
@@ -883,7 +878,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
messageSelector, null, false, false);
}
-
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
String selector) throws JMSException
{
@@ -920,7 +914,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
- throws JMSException
+ throws JMSException
{
checkNotClosed();
checkValidTopic(topic);
@@ -929,8 +923,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_subscriptions.get(name).close();
}
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -940,7 +934,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public MapMessage createMapMessage() throws JMSException
{
checkNotClosed();
- return new JMSMapMessage();
+ return new JMSMapMessage(getMessageDelegateFactory());
}
public javax.jms.Message createMessage() throws JMSException
@@ -951,7 +945,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public ObjectMessage createObjectMessage() throws JMSException
{
checkNotClosed();
- return (ObjectMessage) new JMSObjectMessage();
+ return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory());
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
@@ -962,23 +956,23 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
return msg;
}
- public BasicMessageProducer createProducer(Destination destination) throws JMSException
+ public P createProducer(Destination destination) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
- public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
+ public P createProducer(Destination destination, boolean immediate) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate,
boolean waitUntilSent) throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
@@ -988,7 +982,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
+ return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1025,24 +1019,44 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param exclusive Flag to indicate that the queue is exclusive to this client.
*
* @throws AMQException If the queue cannot be declared for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
final boolean exclusive) throws AMQException
{
+ createQueue(name, autoDelete, durable, exclusive, null);
+ }
+
+ /**
+ * Declares the named queue.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param name The name of the queue to declare.
+ * @param autoDelete
+ * @param durable Flag to indicate that the queue is durable.
+ * @param exclusive Flag to indicate that the queue is exclusive to this client.
+ * @param arguments Arguments used to set special properties of the queue
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+ final boolean exclusive, final Map<String, Object> arguments) throws AMQException
+ {
new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendCreateQueue(name, autoDelete, durable, exclusive);
+ sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
return null;
}
}, _connection).execute();
}
public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive)throws AMQException, FailoverException;
+ final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException;
+
/**
* Creates a QueueReceiver
*
@@ -1056,7 +1070,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+ C consumer = (C) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1075,7 +1089,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+ C consumer = (C) createConsumer(destination, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1093,7 +1107,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
+ C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1112,7 +1126,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
+ C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1127,11 +1141,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public StreamMessage createStreamMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
+ // calls through connection.closeAllSessions which is also called by the public connection.close()
+ // with a null cause
+ // When we are closing the Session due to a protocol session error we simply create a new AMQException
+ // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
+ // We need to determin here if the connection should be
+
+ synchronized (getFailoverMutex())
{
checkNotClosed();
- return new JMSStreamMessage();
+ return new JMSStreamMessage(getMessageDelegateFactory());
}
}
@@ -1150,10 +1171,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
}
-
/**
* Creates a non-durable subscriber with a message selector
*
@@ -1171,7 +1191,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1185,14 +1205,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public TextMessage createTextMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
checkNotClosed();
- return new JMSTextMessage();
+ return new JMSTextMessage(getMessageDelegateFactory());
}
}
+ protected Object getFailoverMutex()
+ {
+ return _connection.getFailoverMutex();
+ }
+
public TextMessage createTextMessage(String text) throws JMSException
{
@@ -1340,17 +1365,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
_logger.debug("Message[" + message.toString() + "] received in session");
}
-
- if (message instanceof ReturnMessage)
- {
- // Return of the bounced message.
- returnBouncedMessage((ReturnMessage)message);
- }
- else
- {
- _highestDeliveryTag.set(message.getDeliveryTag());
- _queue.add(message);
- }
+ _highestDeliveryTag.set(message.getDeliveryTag());
+ _queue.add(message);
}
public void declareAndBind(AMQDestination amqd)
@@ -1360,7 +1376,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler, false);
AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
- bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
}
/**
@@ -1375,7 +1391,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* <li>Stop message delivery.</li>
* <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
* <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
- * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
+ * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
* </ul>
*
* <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
@@ -1429,7 +1445,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- abstract void sendRecover() throws AMQException, FailoverException;
+ protected abstract void sendRecover() throws AMQException, FailoverException;
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
@@ -1465,7 +1481,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
* not mean that the rollback is known to have failed, merely that it is not known whether it
* failed or not.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void rollback() throws JMSException
@@ -1507,8 +1522,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public abstract void releaseForRollback();
- public abstract void sendRollback() throws AMQException, FailoverException ;
-
+ public abstract void sendRollback() throws AMQException, FailoverException;
public void run()
{
@@ -1576,7 +1590,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
- else
+ else // Queue Browser
{
if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
@@ -1591,7 +1605,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
+ protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
final boolean noConsume, final boolean autoClose) throws JMSException
{
@@ -1615,10 +1629,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
messageSelector = selector;
}
- return new FailoverRetrySupport<MessageConsumer, JMSException>(
- new FailoverProtectedOperation<MessageConsumer, JMSException>()
+ return new FailoverRetrySupport<C, JMSException>(
+ new FailoverProtectedOperation<C, JMSException>()
{
- public MessageConsumer execute() throws JMSException, FailoverException
+ public C execute() throws JMSException, FailoverException
{
checkNotClosed();
@@ -1630,13 +1644,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
final FieldTable ft = FieldTableFactory.newFieldTable();
// if (rawSelector != null)
// ft.put("headers", rawSelector.getDataAsBytes());
- if (rawSelector != null)
+ // rawSelector is used by HeadersExchange and is not a JMS Selector
+ if (rawSelector != null)
{
ft.addAll(rawSelector);
}
+
+ if (messageSelector != null)
+ {
+ ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
+ }
- BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow,
- noLocal,exclusive, messageSelector, ft, noConsume, autoClose);
+ C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
if (_messageListener != null)
{
@@ -1679,9 +1699,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
- final boolean noConsume, final boolean autoClose) throws JMSException;
+ public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
+ final boolean noConsume, final boolean autoClose) throws JMSException;
/**
* Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
@@ -1689,9 +1709,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @param consumer the consum
*/
- void deregisterConsumer(BasicMessageConsumer consumer)
+ void deregisterConsumer(C consumer)
{
- if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
+ if (_consumers.remove(consumer.getConsumerTag()) != null)
{
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
@@ -1744,12 +1764,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
*
* @throws JMSException If the query fails for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException;
-
+
public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
/**
@@ -1771,7 +1790,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*/
void resubscribe() throws AMQException
{
- _failedOver = true;
+ if (_dirty)
+ {
+ _failedOverDirty = true;
+ }
resubscribeProducers();
resubscribeConsumers();
}
@@ -1790,10 +1812,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
*
* @throws AMQException If the session cannot be started for any reason.
- *
* @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
- * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
- * for each subsequent call to flow.. only need to do this if we have called stop.
+ * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
+ * for each subsequent call to flow.. only need to do this if we have called stop.
*/
void start() throws AMQException
{
@@ -1978,12 +1999,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ final Iterator<C> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = it.next();
+ final C con = it.next();
if (error != null)
{
con.notifyError(error);
@@ -1994,7 +2015,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
// at this point the _consumers map will be empty
- if (_dispatcher != null)
+ if (_dispatcher != null)
{
_dispatcher.close();
_dispatcher = null;
@@ -2014,7 +2035,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
final Iterator it = clonedProducers.iterator();
while (it.hasNext())
{
- final BasicMessageProducer prod = (BasicMessageProducer) it.next();
+ final P prod = (P) it.next();
prod.close();
}
// at this point the _producers map is empty
@@ -2062,20 +2083,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @param queueName
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
+ private void consumeFromQueue(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
int tagId = _nextTag++;
- // need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(tagId));
- consumer.setConsumerTag(tag);
+ consumer.setConsumerTag(tagId);
// we must register the consumer in the map before we actually start listening
_consumers.put(tagId, consumer);
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag);
+ sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
}
catch (AMQException e)
{
@@ -2085,27 +2104,27 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException;
+ public abstract void sendConsume(C consumer, AMQShortString queueName,
+ AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
- private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+ private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, false);
}
- private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+ private P createProducerImpl(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent) throws JMSException
{
- return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
- new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
+ return new FailoverRetrySupport<P, JMSException>(
+ new FailoverProtectedOperation<P, JMSException>()
{
- public BasicMessageProducer execute() throws JMSException, FailoverException
+ public P execute() throws JMSException, FailoverException
{
checkNotClosed();
long producerId = getNextProducerId();
- BasicMessageProducer producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+ P producer = createMessageProducer(destination, mandatory,
+ immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
return producer;
@@ -2113,21 +2132,20 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent, long producerId);
+ public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
+ final boolean immediate, final boolean waitUntilSent, long producerId);
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
}
-
/**
* Returns the number of messages currently queued for the given destination.
*
* <p/>Note that this operation automatically retries in the event of fail-over.
*
- * @param amqd The destination to be checked
+ * @param amqd The destination to be checked
*
* @return the number of queued messages.
*
@@ -2147,7 +2165,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
+ protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
/**
* Declares the named exchange and type of exchange.
@@ -2160,7 +2178,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param nowait
*
* @throws AMQException If the exchange cannot be declared for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
@@ -2177,8 +2194,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException;
-
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Declares a queue for a JMS destination.
@@ -2196,9 +2212,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* the client.
*
* @throws AMQException If the queue cannot be declared for any reason.
- *
* @todo Verify the destiation is valid or throw an exception.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
@@ -2224,7 +2238,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException;
+ public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2234,7 +2248,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param queueName The name of the queue to delete.
*
* @throws JMSException If the queue could not be deleted for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected void deleteQueue(final AMQShortString queueName) throws JMSException
@@ -2256,7 +2269,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
+ public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
private long getNextProducerId()
{
@@ -2292,12 +2305,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ final Iterator<C> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = it.next();
+ final C con = it.next();
con.markClosed();
}
// at this point the _consumers map will be empty
@@ -2332,7 +2345,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @throws AMQException
*/
- private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException
+ private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException
{
AMQDestination amqd = consumer.getDestination();
@@ -2345,8 +2358,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
// store the consumer queue name
consumer.setQueuename(queueName);
- // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
@@ -2397,15 +2409,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private void rejectAllMessages(boolean requeue)
{
- rejectMessagesForConsumerTag(null, requeue);
+ rejectMessagesForConsumerTag(0, requeue, true);
}
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
+ * @param rejectAllConsumers
*/
- private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
+ private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
{
Iterator messages = _queue.iterator();
if (_logger.isInfoEnabled())
@@ -2426,12 +2439,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString()))
+ if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
{
if (_logger.isDebugEnabled())
{
_logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
- + message.getDeliveryTag());
+ + message.getDeliveryTag());
}
messages.remove();
@@ -2448,12 +2461,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private void resubscribeConsumers() throws AMQException
{
- ArrayList consumers = new ArrayList(_consumers.values());
+ ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
_consumers.clear();
- for (Iterator it = consumers.iterator(); it.hasNext();)
- {
- BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ for (C consumer : consumers)
+ {
consumer.failedOver();
registerConsumer(consumer, true);
}
@@ -2465,53 +2477,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
- BasicMessageProducer producer = (BasicMessageProducer) it.next();
+ P producer = (P) it.next();
producer.resubscribe();
}
}
- private void returnBouncedMessage(final ReturnMessage msg)
- {
- _connection.performConnectionTask(new Runnable()
- {
- public void run()
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
- AMQShortString reason = msg.getReplyText();
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- }
- else
- {
- _connection.exceptionReceived(
- new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
- }
-
- }
- catch (Exception e)
- {
- _logger.error(
- "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
- e);
- }
- }
- });
- }
-
/**
* Suspends or unsuspends this session.
*
@@ -2519,7 +2489,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* should be unsuspended.
*
* @throws AMQException If the session cannot be suspended for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException
@@ -2560,7 +2529,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
return getAMQConnection().getMaxPrefetch() > 0;
}
-
/** Signifies that the session has pending sends to commit. */
public void markDirty()
{
@@ -2571,7 +2539,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public void markClean()
{
_dirty = false;
- _failedOver = false;
+ _failedOverDirty = false;
}
/**
@@ -2581,7 +2549,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*/
public boolean hasFailedOver()
{
- return _failedOver;
+ return _failedOverDirty;
}
/**
@@ -2604,12 +2572,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_flowControl.setFlowControl(active);
}
-
public void checkFlowControl() throws InterruptedException
{
- synchronized(_flowControl)
+ synchronized (_flowControl)
{
- while(!_flowControl.getFlowControl())
+ while (!_flowControl.getFlowControl())
{
_flowControl.wait();
}
@@ -2617,6 +2584,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
+ /** Used for debugging in the dispatcher. */
+ private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
class Dispatcher extends Thread
@@ -2629,6 +2599,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private final AtomicLong _rollbackMark = new AtomicLong(-1);
private String dispatcherID = "" + System.identityHashCode(this);
+
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -2647,7 +2619,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- public void rejectPending(BasicMessageConsumer consumer)
+ public void rejectPending(C consumer)
{
synchronized (_lock)
{
@@ -2662,7 +2634,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
consumer.rollbackPendingMessages();
// Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
//Let the dispatcher deal with this when it gets to them.
// closeConsumer
@@ -2689,7 +2661,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (C consumer : _consumers.values())
{
if (!consumer.isNoConsume())
{
@@ -2755,7 +2727,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_lock.wait();
}
- if (tagLE(deliveryTag, _rollbackMark.get()))
+ if (!(message instanceof CloseConsumerMessage)
+ && tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
}
@@ -2816,8 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
//This if block is not needed anymore as bounce messages are handled separately
//if (message.getDeliverBody() != null)
//{
- final BasicMessageConsumer consumer =
- _consumers.get(message.getConsumerTag().toIntValue());
+ final C consumer =
+ _consumers.get(message.getConsumerTag());
if ((consumer == null) || consumer.isClosed())
{
@@ -2826,14 +2799,26 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (consumer == null)
{
_dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliveryTag() + "] from queue "
- + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
+ + message.getDeliveryTag() + "] from queue "
+ + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
}
else
{
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliveryTag() + "] from queue " + " consumer("
- + message.getConsumerTag() + ") is closed rejecting(requeue)...");
+ if (consumer.isNoConsume())
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ + message.getDeliveryTag() + "] from queue " + " consumer("
+ + message.getConsumerTag() + ") is closed and a browser so dropping...");
+ //DROP MESSAGE
+ return;
+
+ }
+ else
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ + message.getDeliveryTag() + "] from queue " + " consumer("
+ + message.getConsumerTag() + ") is closed rejecting(requeue)...");
+ }
}
}
// Don't reject if we're already closing
@@ -2850,9 +2835,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- abstract boolean tagLE(long tag1, long tag2);
+ protected abstract boolean tagLE(long tag1, long tag2);
+
+ protected abstract boolean updateRollbackMark(long current, long deliveryTag);
- abstract boolean updateRollbackMark(long current, long deliveryTag);
+ public abstract AMQMessageDelegateFactory getMessageDelegateFactory();
/*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
boolean read) throws AMQException
@@ -2880,9 +2867,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private class SuspenderRunner implements Runnable
{
- private boolean _suspend;
+ private AtomicBoolean _suspend;
- public SuspenderRunner(boolean suspend)
+ public SuspenderRunner(AtomicBoolean suspend)
{
_suspend = suspend;
}
@@ -2891,7 +2878,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
try
{
- suspendChannel(_suspend);
+ synchronized (_suspensionLock)
+ {
+ suspendChannel(_suspend.get());
+ }
}
catch (AMQException e)
{