summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-10-18 10:10:19 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-10-18 10:10:19 +0000
commit4baafab6e9c393a7103c1a2f4da445e912f1a783 (patch)
tree5fe39002c6a4dea81a7ec0b37ad7277921d63c53
parent5e6cb3bf5178acf5ab84e9b2478007d0a86c6459 (diff)
downloadqpid-python-4baafab6e9c393a7103c1a2f4da445e912f1a783.tar.gz
QPID-637 : Patch provided by Aidan Skinner to ensure correct behaviour of session closure.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@585912 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java113
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java124
2 files changed, 110 insertions, 127 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index fee6690538..a0b79b135d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -112,22 +113,20 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
- *
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td>
* </table>
*
* @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
- * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
- * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
- * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
- * has been reestablished. All fail-over protected operations should be placed in private methods, with
- * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
- * fail-over process sets a nowait flag and uses an async method call instead.
- *
+ * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
+ * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
+ * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
+ * has been reestablished. All fail-over protected operations should be placed in private methods, with
+ * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
+ * fail-over process sets a nowait flag and uses an async method call instead.
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
- * after looking at worse bottlenecks first.
+ * after looking at worse bottlenecks first.
*/
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -222,9 +221,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
- /**
- * Holds the highest received delivery tag.
- */
+ /** Holds the highest received delivery tag. */
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
/** Holds the dispatcher thread for this session. */
@@ -236,9 +233,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
- /**
- * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods.
- */
+ /** Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods. */
private int _nextTag = 1;
/**
@@ -374,12 +369,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Creates a new session on a connection with the default message factory factory.
*
- * @param con The connection on which to create the session.
- * @param channelId The unique identifier for the session.
- * @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
- * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
- * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
int defaultPrefetchLow)
@@ -402,12 +397,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
-
- return new JMSBytesMessage();
- }
+ checkNotClosed();
+ return new JMSBytesMessage();
}
/**
@@ -462,9 +453,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param exchangeName The exchange to bind the queue on.
*
* @throws AMQException If the queue cannot be bound for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
- *
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
@@ -501,14 +490,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
- *
* @todo Be aware of possible changes to parameter order as versions change.
- *
* @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
- * re-opened. May need to examine this more carefully.
- *
+ * re-opened. May need to examine this more carefully.
* @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
- * because the failover process sends the failover event before acquiring the mutex itself.
+ * because the failover process sends the failover event before acquiring the mutex itself.
*/
public void close(long timeout) throws JMSException
{
@@ -579,6 +565,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
synchronized (_connection.getFailoverMutex())
{
+ if (e instanceof AMQDisconnectedException)
+ {
+ if (_dispatcher != null)
+ {
+ // Failover failed and ain't coming back. Knife the dispatcher.
+ _dispatcher.interrupt();
+ }
+ }
synchronized (_messageDeliveryLock)
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
@@ -610,7 +604,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
@@ -861,12 +854,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MapMessage createMapMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
-
- return new JMSMapMessage();
- }
+ checkNotClosed();
+ return new JMSMapMessage();
}
public javax.jms.Message createMessage() throws JMSException
@@ -876,12 +865,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
-
- return (ObjectMessage) new JMSObjectMessage();
- }
+ checkNotClosed();
+ return (ObjectMessage) new JMSObjectMessage();
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
@@ -955,7 +940,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param exclusive Flag to indicate that the queue is exclusive to this client.
*
* @throws AMQException If the queue cannot be declared for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
@@ -1301,7 +1285,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* <li>Stop message delivery.</li>
* <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
* <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
- * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
+ * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
* </ul>
*
* <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
@@ -1311,7 +1295,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
* Not that this does not necessarily mean that the recovery has failed, but simply that it
* is not possible to tell if it has or not.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void recover() throws JMSException
@@ -1402,7 +1385,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting delivery tag:" + deliveryTag);
+ _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode());
}
AMQFrame basicRejectBody =
@@ -1423,7 +1406,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
* not mean that the rollback is known to have failed, merely that it is not known whether it
* failed or not.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void rollback() throws JMSException
@@ -1695,7 +1677,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
*
* @throws JMSException If the query fails for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
@@ -1768,10 +1749,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
*
* @throws AMQException If the session cannot be started for any reason.
- *
* @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
- * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
- * for each subsequent call to flow.. only need to do this if we have called stop.
+ * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
+ * for each subsequent call to flow.. only need to do this if we have called stop.
*/
void start() throws AMQException
{
@@ -2138,7 +2118,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param nowait
*
* @throws AMQException If the exchange cannot be declared for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
@@ -2183,9 +2162,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* the client.
*
* @throws AMQException If the queue cannot be declared for any reason.
- *
* @todo Verify the destiation is valid or throw an exception.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
@@ -2229,7 +2206,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param queueName The name of the queue to delete.
*
* @throws JMSException If the queue could not be deleted for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void deleteQueue(final AMQShortString queueName) throws JMSException
@@ -2404,11 +2380,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_producers.put(new Long(producerId), producer);
}
- private void rejectAllMessages(boolean requeue)
- {
- rejectMessagesForConsumerTag(null, requeue);
- }
-
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
@@ -2528,7 +2499,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* should be unsuspended.
*
* @throws AMQException If the session cannot be suspended for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void suspendChannel(boolean suspend) throws AMQException // , FailoverException
@@ -2571,6 +2541,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final Object _lock = new Object();
private final AtomicLong _rollbackMark = new AtomicLong(-1);
+ private String dispatcherID = "" + System.identityHashCode(this);
public Dispatcher()
{
@@ -2606,6 +2577,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Reject messages on pre-dispatch queue
rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+ //Let the dispatcher deal with this when it gets to them.
// closeConsumer
consumer.markClosed();
@@ -2639,8 +2611,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- // should perhaps clear the _SQ here.
- // consumer._synchronousQueue.clear();
+ // cClear the _SQ here.
consumer.clearReceiveQueue();
}
@@ -2756,14 +2727,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (consumer == null)
{
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
+ message.getDeliverBody().deliveryTag + "] from queue "
+ message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
}
else
{
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
+ _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ") ["
+ + message.getDeliverBody().deliveryTag + "] from queue consumer("
+ consumer.debugIdentity() + ") is closed rejecting(requeue)...");
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 9c5c344275..ddaf0cfd93 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -142,9 +142,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -221,7 +221,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
_logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
- + _destination);
+ + _destination);
}
}
else
@@ -243,9 +243,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
//todo: handle case where connection has already been started, and the dispatcher has alreaded started
// putting values on the _synchronousQueue
- _messageListener.set(messageListener);
- _session.setHasMessageListeners();
- _session.startDistpatcherIfNecessary();
+ _messageListener.set(messageListener);
+ _session.setHasMessageListeners();
+ _session.startDistpatcherIfNecessary();
}
}
}
@@ -360,14 +360,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
long endtime = System.currentTimeMillis() + l;
while (System.currentTimeMillis() < endtime && o == null)
{
- try
+ try
{
o = _synchronousQueue.poll(endtime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
_logger.warn("Interrupted: " + e);
- if (isClosed())
+ if (isClosed())
{
return null;
}
@@ -381,11 +381,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
o = _synchronousQueue.take();
- }
+ }
catch (InterruptedException e)
{
_logger.warn("Interrupted: " + e);
- if (isClosed())
+ if (isClosed())
{
return null;
}
@@ -516,9 +516,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
// TODO: Be aware of possible changes to parameter order as versions change.
final AMQFrame cancelFrame =
- BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
- false); // nowait
+ BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+ false); // nowait
try
{
@@ -577,7 +577,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closedStack != null)
{
_logger.trace(_consumerTag + " markClosed():"
- + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -609,9 +609,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
+ _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+ messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
+ messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
if (debug)
{
@@ -696,15 +696,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
- case Session.PRE_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- break;
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
- case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
- break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
}
}
@@ -714,43 +714,43 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
- if (_outstanding <= _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
- if (_dups_ok_acknowledge_send)
- {
- if (!_session.isInRecovery())
+ if (_dups_ok_acknowledge_send)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ }
}
- }
- break;
+ break;
- case Session.AUTO_ACKNOWLEDGE:
- // we do not auto ack a message if the application code called recover()
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.AUTO_ACKNOWLEDGE:
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
}
}
@@ -883,6 +883,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
{
+ _closed.set(true);
_receivingThread.interrupt();
}
@@ -938,6 +939,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
Iterator iterator = _synchronousQueue.iterator();
+ int initialSize = _synchronousQueue.size();
+
+ boolean removed = false;
while (iterator.hasNext())
{
@@ -952,16 +956,24 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
iterator.remove();
+ removed = true;
}
else
{
_logger.error("Queue contained a :" + o.getClass()
- + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
+ removed = true;
}
}
+ if (removed && (initialSize == _synchronousQueue.size()))
+ {
+ _logger.error("Queue had content removed but didn't change in size." + initialSize);
+ }
+
+
if (_synchronousQueue.size() != 0)
{
_logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
@@ -974,7 +986,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public String debugIdentity()
{
- return String.valueOf(_consumerTag);
+ return String.valueOf(_consumerTag) + "[" + System.identityHashCode(this) + "]";
}
public void clearReceiveQueue()