summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java290
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java133
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java35
6 files changed, 379 insertions, 124 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index 5f5b7ccad1..5a9b9b54af 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -43,16 +44,24 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
+
_logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId());
AMQChannel channel = session.getChannel(evt.getChannelId());
BasicRecoverBody body = evt.getMethod();
-
+
if (channel == null)
{
throw body.getChannelNotFoundException(evt.getChannelId());
}
channel.resend(session, body.requeue);
+
+ if (!body.nowait)
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicRecoverOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 03a70d7f39..413524b6d8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -532,7 +532,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (_started)
{
- session.start();
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
return session;
}
@@ -690,7 +697,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
while (it.hasNext())
{
final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
- s.start();
+ try
+ {
+ s.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
_started = true;
}
@@ -703,7 +717,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
for (Iterator i = _sessions.values().iterator(); i.hasNext();)
{
- ((AMQSession) i.next()).stop();
+ try
+ {
+ ((AMQSession) i.next()).stop();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
_started = false;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index c1e5c8b555..81f5bbfcc2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -100,6 +100,9 @@ import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -191,6 +194,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _hasMessageListeners;
+ private boolean _suspended;
+
+ private final Object _suspensionLock = new Object();
+
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
@@ -285,8 +293,50 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//fixme awaitTermination
}
- }
+ public void rollback()
+ {
+
+ synchronized (_lock)
+ {
+ boolean isStopped = connectionStopped();
+
+ if (!isStopped)
+ {
+ setConnectionStopped(true);
+ }
+
+ rejectAllMessages(true);
+
+ _logger.debug("Session Pre Dispatch Queue cleared");
+
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.rollback();
+ }
+
+ setConnectionStopped(isStopped);
+ }
+
+ }
+
+ public void rejectPending(AMQShortString consumerTag)
+ {
+ synchronized (_lock)
+ {
+ boolean stopped = connectionStopped();
+
+ _dispatcher.setConnectionStopped(false);
+
+ rejectMessagesForConsumerTag(consumerTag, true);
+
+ if (stopped)
+ {
+ _dispatcher.setConnectionStopped(stopped);
+ }
+ }
+ }
+ }
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
@@ -328,7 +378,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
- suspendChannel();
+ new Thread(new SuspenderRunner(true)).start();
}
}
@@ -337,7 +387,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
- unsuspendChannel();
+ new Thread(new SuspenderRunner(false)).start();
}
}
});
@@ -480,16 +530,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rollback() throws JMSException
{
- checkTransacted();
- try
- {
- // TODO: Be aware of possible changes to parameter order as versions change.
- getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
- }
- catch (AMQException e)
+ synchronized (_suspensionLock)
{
- throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ checkTransacted();
+ try
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+// suspendChannel(true);
+ }
+
+ _connection.getProtocolHandler().syncWrite(
+ TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ if (!isSuspended)
+ {
+// suspendChannel(false);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ }
}
}
@@ -597,6 +670,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
+
+ public boolean isSuspended()
+ {
+ return _suspended;
+ }
+
+
/**
* Called when the server initiates the closure of the session unilaterally.
*
@@ -737,14 +817,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotTransacted(); // throws IllegalStateException if a transacted session
// this is set only here, and the before the consumer's onMessage is called it is set to false
_inRecovery = true;
- for (BasicMessageConsumer consumer : _consumers.values())
+ try
{
- consumer.clearUnackedMessages();
+
+ boolean isSuspended = isSuspended();
+
+// if (!isSuspended)
+// {
+// suspendChannel(true);
+// }
+
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.clearUnackedMessages();
+ }
+
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ false, // nowait
+ false) // requeue
+ , BasicRecoverOkBody.class);
+
+// if (_dispatcher != null)
+// {
+// _dispatcher.rollback();
+// }
+//
+// if (!isSuspended)
+// {
+// suspendChannel(false);
+// }
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
}
- // TODO: Be aware of possible changes to parameter order as versions change.
- getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false)); // requeue
}
boolean isInRecovery()
@@ -1057,7 +1168,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (AMQInvalidRoutingKeyException e)
{
- JMSException ide = new InvalidDestinationException("Invalid routing key:"+amqd.getRoutingKey().toString());
+ JMSException ide = new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
ide.setLinkedException(e);
throw ide;
}
@@ -1731,7 +1842,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _channelId;
}
- void start()
+ void start() throws AMQException
{
//fixme This should be controlled by _stopped as it pairs with the stop method
//fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
@@ -1740,7 +1851,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
- unsuspendChannel();
+ suspendChannel(false);
}
if (hasMessageListeners())
@@ -1773,10 +1884,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- void stop()
+ void stop() throws AMQException
{
//stop the server delivering messages to this session
- suspendChannel();
+ suspendChannel(true);
if (_dispatcher != null)
{
@@ -1837,6 +1948,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_destinationConsumerCount.remove(dest);
}
}
+
+ //ensure we remove the messages from the consumer even if the dispatcher hasn't started
+ if (_dispatcher == null)
+ {
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+ }
}
}
@@ -1889,35 +2006,54 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- private void suspendChannel()
+ private void suspendChannel(boolean suspend) throws AMQException
{
- _logger.warn("Suspending channel");
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false); // active
- getProtocolHandler().writeFrame(channelFlowFrame);
- }
+ synchronized (_suspensionLock)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
+ }
- private void unsuspendChannel()
- {
- _logger.warn("Unsuspending channel");
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- true); // active
- getProtocolHandler().writeFrame(channelFlowFrame);
+ _suspended = suspend;
+
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ !suspend); // active
+
+ _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ }
}
+
public void confirmConsumerCancelled(AMQShortString consumerTag)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
- if ((consumer != null) && (consumer.isAutoClose()))
+ if (consumer != null)
{
- consumer.closeWhenNoMessages(true);
+ if (consumer.isAutoClose())
+ {
+ consumer.closeWhenNoMessages(true);
+ }
+ else
+ {
+ consumer.rollback();
+ }
}
- }
+ //Flush any pending messages for this consumerTag
+ if (_dispatcher != null)
+ {
+ _dispatcher.rejectPending(consumerTag);
+ }
+ else
+ {
+ rejectMessagesForConsumerTag(consumerTag, true);
+ }
+ }
/*
* I could have combined the last 3 methods, but this way it improves readability
@@ -2008,5 +2144,75 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
+ private class SuspenderRunner implements Runnable
+ {
+ private boolean _suspend;
+
+ public SuspenderRunner(boolean suspend)
+ {
+ _suspend = suspend;
+ }
+
+ public void run()
+ {
+ try
+ {
+ suspendChannel(_suspend);
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to suspend channel");
+ }
+ }
+ }
+
+
+ private void rejectAllMessages(boolean requeue)
+ {
+ rejectMessagesForConsumerTag(null, requeue);
+ }
+
+ /** @param consumerTag The consumerTag to prune from queue or all if null
+ * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
+ */
+
+ private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
+ {
+ Iterator messages = _queue.iterator();
+
+ while (messages.hasNext())
+ {
+ UnprocessedMessage message = (UnprocessedMessage) messages.next();
+
+ if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Removing message from _queue:" + message);
+ }
+
+ messages.remove();
+
+// rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+
+ _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+ }
+ else
+ {
+ _logger.error("Pruned pending message for consumer:" + consumerTag);
+ }
+ }
+ }
+
+ public void rejectMessage(long deliveryTag, boolean requeue)
+ {
+ AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ deliveryTag,
+ requeue);
+
+ _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 33fd64f368..496e377435 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -49,9 +49,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
- /**
- * The connection being used by this consumer
- */
+ /** The connection being used by this consumer */
private AMQConnection _connection;
private String _messageSelector;
@@ -60,33 +58,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private AMQDestination _destination;
- /**
- * When true indicates that a blocking receive call is in progress
- */
+ /** When true indicates that a blocking receive call is in progress */
private final AtomicBoolean _receiving = new AtomicBoolean(false);
- /**
- * Holds an atomic reference to the listener installed.
- */
+ /** Holds an atomic reference to the listener installed. */
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
- /**
- * The consumer tag allows us to close the consumer by sending a jmsCancel method to the
- * broker
- */
+ /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
private AMQShortString _consumerTag;
- /**
- * We need to know the channel id when constructing frames
- */
+ /** We need to know the channel id when constructing frames */
private int _channelId;
/**
- * Used in the blocking receive methods to receive a message from
- * the Session thread.
- * <p/>
- * Or to notify of errors
- * <p/>
- * Argument true indicates we want strict FIFO semantics
+ * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
+ * <p/> Argument true indicates we want strict FIFO semantics
*/
private final ArrayBlockingQueue _synchronousQueue;
@@ -96,55 +81,48 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private AMQProtocolHandler _protocolHandler;
- /**
- * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
- */
+ /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
private FieldTable _rawSelectorFieldTable;
/**
- * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
+ * failover
*/
private int _prefetchHigh;
/**
- * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
+ * failover
*/
private int _prefetchLow;
- /**
- * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
- */
+ /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */
private boolean _exclusive;
/**
- * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes
- * per consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
+ * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
+ * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
* implementation.
*/
private int _acknowledgeMode;
- /**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
+ /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
private int _outstanding;
- /**
- * Tag of last message delievered, whoch should be acknowledged on commit in
- * transaction mode.
- */
+ /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */
private long _lastDeliveryTag;
/**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode.
- * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+ * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
+ * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
*/
private boolean _dups_ok_acknowledge_send;
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
/**
- * The thread that was used to call receive(). This is important for being able to interrupt that thread if
- * a receive() is in progress.
+ * The thread that was used to call receive(). This is important for being able to interrupt that thread if a
+ * receive() is in progress.
*/
private Thread _receivingThread;
@@ -417,13 +395,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
/**
- * We can get back either a Message or an exception from the queue. This method examines the argument and deals
- * with it by throwing it (if an exception) or returning it (in any other case).
+ * We can get back either a Message or an exception from the queue. This method examines the argument and deals with
+ * it by throwing it (if an exception) or returning it (in any other case).
*
* @param o
+ *
* @return a message only if o is a Message
- * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not
- * a JMSException is created with the linked exception set appropriately
+ *
+ * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
+ * JMSException is created with the linked exception set appropriately
*/
private AbstractJMSMessage returnMessageOrThrow(Object o)
throws JMSException
@@ -488,9 +468,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
/**
- * Called when you need to invalidate a consumer. Used for example when failover has occurred and the
- * client has vetoed automatic resubscription.
- * The caller must hold the failover mutex.
+ * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
+ * vetoed automatic resubscription. The caller must hold the failover mutex.
*/
void markClosed()
{
@@ -499,8 +478,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
/**
- * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
- * of a message listener or a synchronous receive() caller.
+ * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a
+ * message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
* @param channelId channel on which this message was sent
@@ -643,9 +622,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- */
+ /** Acknowledge up to last message delivered (if any). Used when commiting. */
void acknowledgeLastDelivered()
{
if (_lastDeliveryTag > 0)
@@ -676,8 +653,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
- * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean
- * case and in the case of an error occurring.
+ * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in
+ * the case of an error occurring.
*/
private void deregisterConsumer()
{
@@ -728,9 +705,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- /**
- * Called on recovery to reset the list of delivery tags
- */
+ /** Called on recovery to reset the list of delivery tags */
public void clearUnackedMessages()
{
_unacknowledgedDeliveryTags.clear();
@@ -760,4 +735,42 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+
+ public void rollback()
+ {
+
+ if (_synchronousQueue.size() > 0)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag);
+ }
+ for (Object o : _synchronousQueue)
+ {
+ if (o instanceof AbstractJMSMessage)
+ {
+// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
+
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejected message" + o);
+ }
+
+ }
+ else
+ {
+ _logger.error("Queue contained a :" + o.getClass() +
+ " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ }
+ }
+
+ if (_synchronousQueue.size() != 0)
+ {
+ _logger.warn("Queue was not empty after rejecting all messages");
+ }
+
+ _synchronousQueue.clear();
+ }
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 7cadbd409a..e2b101ab79 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -64,7 +64,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
protocolSession.writeFrame(frame);
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ }
if (errorCode == AMQConstant.NO_CONSUMERS)
{
throw new AMQNoConsumersException("Error: " + reason, null);
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 5b7144b524..03e7d399ce 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -20,23 +20,23 @@
*/
package org.apache.qpid.client.util;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.log4j.Logger;
+
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Iterator;
/**
- * A blocking queue that emits events above a user specified threshold allowing
- * the caller to take action (e.g. flow control) to try to prevent the queue
- * growing (much) further. The underlying queue itself is not bounded therefore
- * the caller is not obliged to react to the events.
- * <p/>
- * This implementation is <b>only</b> safe where we have a single thread adding
- * items and a single (different) thread removing items.
+ * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
+ * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
+ * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single
+ * thread adding items and a single (different) thread removing items.
*/
public class FlowControllingBlockingQueue
{
- /**
- * This queue is bounded and is used to store messages before being dispatched to the consumer
- */
+ /** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final BlockingQueue _queue = new LinkedBlockingQueue();
private final int _flowControlHighThreshold;
@@ -44,12 +44,10 @@ public class FlowControllingBlockingQueue
private final ThresholdListener _listener;
- /**
- * We require a separate count so we can track whether we have reached the
- * threshold
- */
+ /** We require a separate count so we can track whether we have reached the threshold */
private int _count;
+
public interface ThresholdListener
{
void aboveThreshold(int currentValue);
@@ -74,7 +72,7 @@ public class FlowControllingBlockingQueue
Object o = _queue.take();
if (_listener != null)
{
- synchronized(_listener)
+ synchronized (_listener)
{
if (_count-- == _flowControlLowThreshold)
{
@@ -90,7 +88,7 @@ public class FlowControllingBlockingQueue
_queue.add(o);
if (_listener != null)
{
- synchronized(_listener)
+ synchronized (_listener)
{
if (++_count == _flowControlHighThreshold)
{
@@ -99,5 +97,10 @@ public class FlowControllingBlockingQueue
}
}
}
+
+ public Iterator iterator()
+ {
+ return _queue.iterator();
+ }
}