summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/etc/log4j.xml5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java98
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java59
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java39
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java111
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java49
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java77
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java151
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java5
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java5
-rw-r--r--qpid/specs/amqp.0-8.xml13
17 files changed, 473 insertions, 179 deletions
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index 28a572eac9..810b65798e 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -46,6 +46,11 @@
<priority value="info"/>
</category>
+ <category name="org.apache.qpid.framing.AMQDataBlockEncoder">
+ <priority value="info"/>
+ </category>
+
+
<category name="org.apache.qpid">
<priority value="warn"/>
</category>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a85db9f26b..e1b6497062 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -308,6 +308,10 @@ public class AMQChannel
public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Unsubscribed consumer:" + consumerTag);
+ }
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
{
@@ -350,9 +354,17 @@ public class AMQChannel
* @param message
* @param deliveryTag
* @param queue
+ * @param consumerTag
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId +
+ " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag +
+ " from queue:" + queue.getName());
+ }
+
synchronized (_unacknowledgedMessageMapLock)
{
_unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
@@ -364,6 +376,8 @@ public class AMQChannel
/**
* Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
* this same channel or to other subscribers.
+ *
+ * @throws org.apache.qpid.AMQException if delivery failes
*/
public void requeue() throws AMQException
{
@@ -372,7 +386,12 @@ public class AMQChannel
synchronized (_unacknowledgedMessageMapLock)
{
currentList = _unacknowledgedMessageMap;
- _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+ _unacknowledgedMessageMap = newUnacknowledgedMap();
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Requeuing " + currentList.size() + " messages for channel:" + System.identityHashCode(this));
}
for (UnacknowledgedMessage unacked : currentList.values())
@@ -391,62 +410,61 @@ public class AMQChannel
/** Called to resend all outstanding unacknowledged messages to this same channel. */
public void resend() throws AMQException
{
- //messages go to this channel
+ Map<Long, UnacknowledgedMessage> currentList;
+
synchronized (_unacknowledgedMessageMapLock)
{
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> messageSetIterator =
- _unacknowledgedMessageMap.entrySet().iterator();
-
- while (messageSetIterator.hasNext())
- {
- Map.Entry<Long, UnacknowledgedMessage> entry = messageSetIterator.next();
+ currentList = _unacknowledgedMessageMap;
+ _unacknowledgedMessageMap = newUnacknowledgedMap();
+ }
- //long deliveryTag = entry.getKey();
- String consumerTag = entry.getValue().consumerTag;
+ for (Map.Entry<Long, UnacknowledgedMessage> entry : currentList.entrySet())
+ {
+ UnacknowledgedMessage unacked = entry.getValue();
- if (_consumerTag2QueueMap.containsKey(consumerTag))
- {
- AMQMessage msg = entry.getValue().message;
- msg.setRedelivered(true);
- Subscription sub = msg.getDeliveredSubscription();
+ String consumerTag = unacked.consumerTag;
- if (sub != null)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Requeuing " + msg + " for resend");
- }
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ AMQMessage msg = entry.getValue().message;
+ msg.setRedelivered(true);
+ Subscription sub = msg.getDeliveredSubscription();
- sub.addToResendQueue(msg);
- }
- else
+ if (sub != null)
+ {
+ if (_log.isDebugEnabled())
{
- _log.error("DeliveredSubscription not recorded");
+ _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
}
- // Don't write the frame as the DeliveryManager can now deal with it
- //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+ sub.addToResendQueue(msg);
}
else
- { // The current consumer has gone so we need to requeue
+ {
+ _log.error("DeliveredSubscription not recorded");
+ }
- UnacknowledgedMessage unacked = entry.getValue();
+ // Don't write the frame as the DeliveryManager can now deal with it
+ //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+ }
+ else
+ { // The current consumer has gone so we need to requeue
- if (unacked.queue != null)
- {
- unacked.message.setTxnBuffer(null);
+ if (unacked.queue != null)
+ {
+ unacked.message.setTxnBuffer(null);
- unacked.message.release();
+ unacked.message.release();
- unacked.queue.deliver(unacked.message);
- }
- // delete the requeued message.
- messageSetIterator.remove();
+ unacked.queue.deliver(unacked.message);
}
}
}
+ }
- //fixme need to start the async delivery here.
+ private Map<Long, UnacknowledgedMessage> newUnacknowledgedMap()
+ {
+ return new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
}
/**
@@ -541,9 +559,9 @@ public class AMQChannel
private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
{
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
+ _log.trace("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
" and multiple " + multiple);
}
if (multiple)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index f83d38ad47..7c8a91870c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -49,11 +50,6 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
_logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
- if (channel == null)
- {
- throw new AMQException("Unknown channel " + evt.getChannelId());
- }
-
if (evt.getMethod().getRequeue())
{
//fixme need tests to exercise
@@ -63,5 +59,11 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
{
channel.resend();
}
+
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(BasicRecoverOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 1d11f6297b..f09142b4cb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -54,14 +54,14 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
channel.rollback();
+ //The DeliveryManager will now be responsible for dispatching these messages.
+ // So call resend to put them on the correct resend queue.
+ channel.resend();
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
-
- //Now resend all the unacknowledged messages back to the original subscribers.
- //(Must be done after the TxnRollback-ok response).
- channel.resend();
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 89807fb043..20d706b2c6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -136,7 +136,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
catch (RuntimeException e)
{
e.printStackTrace();
- // throw e;
+ // throw e;
}
}
@@ -183,12 +183,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
String locales = "en_US";
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- _major, _minor, // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short)_major, // versionMajor
- (short)_minor); // versionMinor
+ _major, _minor, // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short) _major, // versionMajor
+ (short) _minor); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -307,8 +307,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -335,14 +335,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public AMQChannel getChannel(int channelId) throws AMQException
{
- return _channelMap.get(channelId);
+ AMQChannel channel = _channelMap.get(channelId);
+
+ if (channel == null)
+ {
+ throw new AMQException("Unknown channel " + channelId);
+ }
+ else
+ {
+ return channel;
+ }
}
public void addChannel(AMQChannel channel) throws AMQException
{
if (_closed)
{
- throw new AMQException("Session is closed");
+ throw new AMQException("Session is closed");
}
_channelMap.put(channel.getChannelId(), channel);
@@ -385,12 +394,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
/**
- * Close a specific channel. This will remove any resources used by the channel, including:
- * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
- * </ul>
+ * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
+ * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
*
* @param channelId id of the channel to close
- * @throws AMQException if an error occurs closing the channel
+ *
+ * @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
public void closeChannel(int channelId) throws AMQException
@@ -438,8 +447,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
/**
- * Closes all channels that were opened by this protocol session. This frees up all resources
- * used by the channel.
+ * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
*
* @throws AMQException if an error occurs while closing any channel
*/
@@ -452,10 +460,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_channelMap.clear();
}
- /**
- * This must be called when the session is _closed in order to free up any resources
- * managed by the session.
- */
+ /** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
if (!_closed)
@@ -479,17 +484,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
}
- /**
- * @return an object that can be used to identity
- */
+ /** @return an object that can be used to identity */
public Object getKey()
{
return _minaProtocolSession.getRemoteAddress();
}
/**
- * Get the fully qualified domain name of the local address to which this session is bound. Since some servers
- * may be bound to multiple addresses this could vary depending on the acceptor this session was created from.
+ * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
+ * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
*
* @return a String FQDN
*/
@@ -533,8 +536,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
/**
- * Convenience methods for managing AMQP version.
- * NOTE: Both major and minor will be set to 0 prior to protocol initiation.
+ * Convenience methods for managing AMQP version. NOTE: Both major and minor will be set to 0 prior to protocol
+ * initiation.
*/
public byte getAmqpMajor()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index bcad8e7d14..99bf9ca31d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -343,7 +343,7 @@ public class AMQQueue implements Managable, Comparable
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, _deliveryMgr);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
if (subscription.hasFilters())
{
@@ -364,14 +364,15 @@ public class AMQQueue implements Managable, Comparable
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
ps,
- consumerTag,
- _deliveryMgr)))
+ consumerTag)))
== null)
{
throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +
" and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
+ removedSubscription.close();
+
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
@@ -543,6 +544,10 @@ public class AMQQueue implements Managable, Comparable
_maximumMessageAge = maximumMessageAge;
}
+ public void setQueueHasContent(boolean b, SubscriptionImpl subscription)
+ {
+ _deliveryMgr.setQueueHasContent(b, subscription);
+ }
private class Deliver implements TxnOp
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index f66604a5c1..1a26bab011 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -31,17 +31,13 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
-import java.util.HashMap;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Map;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicInteger;
/** Manages delivery of messages on behalf of a queue */
@@ -154,13 +150,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
}
- public void setQueueHasContent(Subscription subscription)
+ public void setQueueHasContent(boolean hasContent, Subscription subscription)
{
_lock.lock();
try
{
- _log.debug("Queue has content Set");
+ _log.trace("Queue has content Set");
_hasContent.add(subscription);
}
finally
@@ -236,8 +232,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (messageQueue == null)
{
- // There is no queue with messages currently
- _log.warn(sub + ": asked to send messages but has none on given queue:" + queue);
+ // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+ }
return;
}
AMQMessage message = null;
@@ -252,7 +251,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message:" + message + " to :" + this);
+ _log.debug("Async Delivery Message (" + System.identityHashCode(message) + ") to :" + System.identityHashCode(this));
}
sub.send(message, queue);
@@ -266,6 +265,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (messageQueue == sub.getResendQueue())
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("All messages sent from resendQueue for " + sub);
+ }
+
_hasContent.remove(sub);
}
else if (messageQueue == sub.getPreDeliveryQueue())
@@ -299,11 +303,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended())
+ // Ensure only we are processing the subscribers. getNextQueue as if the subscriber has a resend queue
+ // they may close and start to empty it themselves.
+ synchronized (sub.sendlock())
{
- sendNextMessage(sub, _queue);
+ if (!sub.isSuspended())
+ {
+ sendNextMessage(sub, _queue);
- hasSubscribers = true;
+ hasSubscribers = true;
+ }
}
}
}
@@ -316,9 +325,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void deliver(String name, AMQMessage msg) throws FailedDequeueException
{
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug(id() + "deliver :" + System.identityHashCode(msg));
+ _log.trace(id() + "deliver :" + System.identityHashCode(msg));
}
//Check if we have someone to deliver the message to.
@@ -436,7 +445,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
_log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") hasContent:"
- + _hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() +
+ + !_hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() +
" Processing:" + _processing.get());
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index e4242f497a..2b9ea5e866 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -81,5 +81,5 @@ interface DeliveryManager
long getOldestMessageArrival();
- void setQueueHasContent(Subscription subscription);
+ void setQueueHasContent(boolean hasContent, Subscription subscription);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index 30b446c309..fe75d25241 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -51,4 +51,6 @@ public interface Subscription
Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
void addToResendQueue(AMQMessage msg);
+
+ Object sendlock();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index ba31ca19b5..52c728a7dd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -34,9 +34,8 @@ import org.apache.qpid.framing.FieldTable;
public interface SubscriptionFactory
{
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
- FieldTable filters, boolean noLocal, DeliveryManager deliveryManager) throws AMQException;
+ FieldTable filters, boolean noLocal, AMQQueue queue) throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
- DeliveryManager deliveryManager) throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index d43e20eb89..a53e305e49 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
@@ -67,35 +68,35 @@ public class SubscriptionImpl implements Subscription
private final Boolean _autoClose;
private boolean _closed = false;
- private DeliveryManager _deliveryManager;
+ private AMQQueue _queue;
+ private final AtomicBoolean _resending = new AtomicBoolean(false);
public static class Factory implements SubscriptionFactory
{
public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
boolean acks, FieldTable filters, boolean noLocal,
- DeliveryManager deliveryManager) throws AMQException
+ AMQQueue queue) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, deliveryManager);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, queue);
}
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
- DeliveryManager deliveryManager)
+ public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, deliveryManager);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, null);
}
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks, DeliveryManager deliveryManager)
+ String consumerTag, boolean acks, AMQQueue queue)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null, false, deliveryManager);
+ this(channelId, protocolSession, consumerTag, acks, null, false, queue);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
String consumerTag, boolean acks, FieldTable filters, boolean noLocal,
- DeliveryManager deliveryManager)
+ AMQQueue queue)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
@@ -110,7 +111,7 @@ public class SubscriptionImpl implements Subscription
sessionKey = protocolSession.getKey();
_acks = acks;
_noLocal = noLocal;
- _deliveryManager = deliveryManager;
+ _queue = queue;
_filters = FilterManagerFactory.createManager(filters);
@@ -238,9 +239,12 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ //fixme what is wrong with this?
+ //AMQDataBlock frame = msg.getDataBlock(channel.getChannelId(),consumerTag,deliveryTag);
+
protocolSession.writeFrame(frame);
}
}
@@ -271,9 +275,12 @@ public class SubscriptionImpl implements Subscription
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ //fixme what is wrong with this?
+ //AMQDataBlock frame = msg.getDataBlock(channel.getChannelId(),consumerTag,deliveryTag);
+
protocolSession.writeFrame(frame);
}
}
@@ -285,7 +292,7 @@ public class SubscriptionImpl implements Subscription
public boolean isSuspended()
{
- return channel.isSuspended();
+ return channel.isSuspended() && !_resending.get();
}
/**
@@ -379,12 +386,20 @@ public class SubscriptionImpl implements Subscription
public void close()
{
+ _logger.info("Closing subscription:" + this);
+
if (_resendQueue != null && !_resendQueue.isEmpty())
{
requeue();
}
- if (!_closed)
+ //remove references in PDQ
+ if (_messages != null)
+ {
+ _messages.clear();
+ }
+
+ if (_autoClose && !_closed)
{
_logger.info("Closing autoclose subscription:" + this);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -400,8 +415,59 @@ public class SubscriptionImpl implements Subscription
private void requeue()
{
- //fixme
- _logger.error("MESSAGES LOST as subscription hasn't yet resent all its requeued messages");
+
+ if (_queue != null)
+ {
+ _logger.trace("Requeuing :" + _resendQueue.size() + " messages");
+
+ //Take control over to this thread for delivering messages from the Async Delivery.
+ setResending(true);
+
+ while (!_resendQueue.isEmpty())
+ {
+ AMQMessage resent = _resendQueue.poll();
+
+ resent.setTxnBuffer(null);
+
+ resent.release();
+
+ try
+ {
+ _queue.deliver(resent);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to re-deliver messages", e);
+ }
+ }
+
+ setResending(false);
+
+ if (!_resendQueue.isEmpty())
+ {
+ _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null.");
+ }
+
+ _queue.setQueueHasContent(false, this);
+ }
+ else
+ {
+ if (!_resendQueue.isEmpty())
+ {
+ _logger.error("Unable to re-deliver messages as queue is null.");
+ }
+ }
+
+ // Clear the messages
+ _resendQueue = null;
+ }
+
+ private void setResending(boolean resending)
+ {
+ synchronized (_resending)
+ {
+ _resending.set(resending);
+ }
}
public boolean isBrowser()
@@ -450,17 +516,22 @@ public class SubscriptionImpl implements Subscription
getResendQueue().add(msg);
// Mark Queue has having content.
- if (_deliveryManager == null)
+ if (_queue == null)
{
_logger.error("Delivery Manager is null won't be able to resend messages");
}
else
{
- _deliveryManager.setQueueHasContent(this);
+ _queue.setQueueHasContent(true, this);
}
}
- private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+ public Object sendlock()
+ {
+ return _resending;
+ }
+
+ private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange, boolean redelivered)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -470,7 +541,7 @@ public class SubscriptionImpl implements Subscription
consumerTag, // consumerTag
deliveryTag, // deliveryTag
exchange, // exchange
- false, // redelivered
+ redelivered, // redelivered
routingKey // routingKey
);
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 91e720ea54..c4dab50ff4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -27,27 +27,18 @@ import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
-/**
- * Holds a set of subscriptions for a queue and manages the round
- * robin-ing of deliver etc.
- */
+/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */
class SubscriptionSet implements WeightedSubscriptionManager
{
private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
- /**
- * List of registered subscribers
- */
+ /** List of registered subscribers */
private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
- /**
- * Used to control the round robin delivery of content
- */
+ /** Used to control the round robin delivery of content */
private int _currentSubscriber;
- /**
- * Accessor for unit tests.
- */
+ /** Accessor for unit tests. */
int getCurrentSubscriber()
{
return _currentSubscriber;
@@ -62,14 +53,19 @@ class SubscriptionSet implements WeightedSubscriptionManager
* Remove the subscription, returning it if it was found
*
* @param subscription
+ *
* @return null if no match was found
*/
public Subscription removeSubscriber(Subscription subscription)
{
- boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
- if (isRemoved)
+ // TODO: possibly need O(1) operation here.
+ int subIndex = _subscriptions.indexOf(subscription);
+
+ if (subIndex != -1)
{
- return subscription;
+ //we can't just return the passed in subscription as it is a new object
+ // and doesn't contain the stored state we need.
+ return _subscriptions.remove(subIndex);
}
else
{
@@ -92,14 +88,11 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
/**
- * Return the next unsuspended subscription or null if not found.
- * <p/>
- * Performance note:
- * This method can scan all items twice when looking for a subscription that is not
- * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
- * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
- * race conditions and when subscriptions are removed between calls to nextSubscriber, the
- * IndexOutOfBoundsException also causes the scan to start at the beginning.
+ * Return the next unsuspended subscription or null if not found. <p/> Performance note: This method can scan all
+ * items twice when looking for a subscription that is not suspended. The worst case occcurs when all subscriptions
+ * are suspended. However, it is does this without synchronisation and subscriptions may be added and removed
+ * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
+ * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
*/
public Subscription nextSubscriber(AMQMessage msg)
{
@@ -156,9 +149,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
return null;
}
- /**
- * Overridden in test classes.
- */
+ /** Overridden in test classes. */
protected void subscriberScanned()
{
}
@@ -199,8 +190,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
/**
- * Notification that a queue has been deleted. This is called so that the subscription can inform the
- * channel, which in turn can update its list of unacknowledged messages.
+ * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which
+ * in turn can update its list of unacknowledged messages.
*
* @param queue
*/
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index eb29d9d805..8dce5d4494 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -805,42 +805,57 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// this is set only here, and the before the consumer's onMessage is called it is set to false
_inRecovery = true;
- boolean isSuspended = isSuspended();
-
- if (!isSuspended)
+ try
{
- try
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
{
- suspendChannel(true);
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
- catch (AMQException e)
+ for (BasicMessageConsumer consumer : _consumers.values())
{
- throw new JMSAMQException(e);
+ consumer.clearUnackedMessages();
}
- }
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- consumer.clearUnackedMessages();
- }
-
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- false)); // requeue
- if (!isSuspended)
- {
- try
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false) // requeue
+ , BasicRecoverOkBody.class);
+
+
+ if (_dispatcher != null)
{
- suspendChannel(false);
+ _dispatcher.rollback();
}
- catch (AMQException e)
+
+ if (!isSuspended)
{
- throw new JMSAMQException(e);
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
}
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
@@ -1873,9 +1888,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void confirmConsumerCancelled(String consumerTag)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
- if ((consumer != null) && (consumer.isAutoClose()))
+ if (consumer != null)
{
- consumer.closeWhenNoMessages(true);
+ if (consumer.isAutoClose())
+ {
+ consumer.closeWhenNoMessages(true);
+ }
+ //fixme seems abit like a hack
+// else
+// {
+// consumer.rollback();
+// }
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index c5ac530297..2bec9dbcdb 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -22,8 +22,11 @@ package org.apache.qpid.test.unit.transacted;
import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.message.AMQMessage;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.log4j.Logger;
@@ -34,6 +37,8 @@ import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
+import java.util.Set;
+import java.util.HashSet;
/**
* This class tests a number of commits and roll back scenarios
@@ -52,6 +57,7 @@ public class CommitRollbackTest extends TestCase
Queue _jmsQueue;
private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class);
+ private final int ACK_MODE = Session.CLIENT_ACKNOWLEDGE;
protected void setUp() throws Exception
{
@@ -64,12 +70,12 @@ public class CommitRollbackTest extends TestCase
{
conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'");
- _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ _session = conn.createSession(true, ACK_MODE);
_jmsQueue = _session.createQueue(queue);
_consumer = _session.createConsumer(_jmsQueue);
- _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ _pubSession = conn.createSession(true, ACK_MODE);
_publisher = _pubSession.createProducer(_pubSession.createQueue(queue));
@@ -129,6 +135,7 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
Message result = _consumer.receive(1000);
+ assertTrue("Redelivered not true", result.getJMSRedelivered());
assertNull("test message was put and rolled back, but is still present", result);
}
@@ -247,7 +254,7 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _pubSession.getTransacted());
_logger.info("sending test message");
- String MESSAGE_TEXT = "testGetThenDisconnect";
+ String MESSAGE_TEXT = "testGetThenRollback";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
_pubSession.commit();
@@ -258,6 +265,7 @@ public class CommitRollbackTest extends TestCase
assertNotNull("retrieved message is null", msg);
assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
+ assertTrue("Message redelivered not set", !msg.getJMSRedelivered());
_logger.info("rolling back");
@@ -270,6 +278,7 @@ public class CommitRollbackTest extends TestCase
_session.commit();
assertNotNull("test message was consumed and rolled back, but is gone", result);
assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ assertTrue("Message redelivered not set", result.getJMSRedelivered());
}
@@ -296,6 +305,142 @@ public class CommitRollbackTest extends TestCase
Message result = _consumer.receive(1000);
assertNull("test message should be null", result);
+
+ _session.commit();
}
+ /**
+ * Test that Closing a consumer and then connection while messags are being resent from a rolling back get correctly
+ * requeued a session purges the dispatcher queue, and the messages arrive in the correct order
+ *
+ * @throws Exception if something goes wrong
+ */
+ public void testRollbackWithConsumerConnectionClose() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending two test messages");
+
+ int MESSAGE_TO_SEND = 1000;
+
+ for (int count = 0; count < MESSAGE_TO_SEND; count++)
+ {
+ _publisher.send(_pubSession.createTextMessage(String.valueOf(count)));
+ }
+
+ _pubSession.commit();
+
+ _logger.info("getting a few messages");
+
+ for (int count = 0; count < MESSAGE_TO_SEND / 2; count++)
+ {
+ assertEquals(String.valueOf(count), ((TextMessage) _consumer.receive(1000)).getText());
+ }
+
+
+ _logger.info("rolling back");
+ _session.rollback();
+
+ _logger.info("closing consumer");
+ _consumer.close();
+ _logger.info("closed consumer");
+
+ _logger.info("close connection");
+ conn.close();
+ _logger.info("closed connection");
+
+ newConnection();
+
+ _logger.info("getting all messages");
+
+ Set<String> results = new HashSet<String>();
+ for (int count = 0; count < MESSAGE_TO_SEND; count++)
+ {
+ TextMessage msg = ((TextMessage) _consumer.receive(1000));
+
+ assertNotNull("Message should not be null, count:" + count, msg);
+ String txt = msg.getText();
+ _logger.trace("Received msg:" + txt + ":" + ((AMQMessage) msg).getDeliveryTag());
+ results.add(txt);
+ }
+
+
+ Message result = _consumer.receive(1000);
+ assertNull("test message should be null", result);
+
+ assertEquals("All messages not received", MESSAGE_TO_SEND, results.size());
+
+ _session.commit();
+ }
+
+
+ /**
+ * Test that Closing a consumer and then session while messags are being resent from a rollback get correctly
+ * requeued, a session purges the dispatcher queue, and the messages arrive in the correct order
+ *
+ * @throws Exception if something goes wrong
+ */
+ public void testRollbackWithConsumerAndSessionClose() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending two test messages");
+
+ int MESSAGE_TO_SEND = 1000;
+
+ for (int count = 0; count < MESSAGE_TO_SEND; count++)
+ {
+ _publisher.send(_pubSession.createTextMessage(String.valueOf(count)));
+ }
+
+ _pubSession.commit();
+
+ _logger.info("getting a few messages");
+
+ for (int count = 0; count < MESSAGE_TO_SEND / 2; count++)
+ {
+ assertEquals(String.valueOf(count), ((TextMessage) _consumer.receive(1000)).getText());
+ }
+
+
+ _logger.info("rolling back");
+ _session.rollback();
+
+ _logger.info("closing consumer");
+ _consumer.close();
+ _logger.info("closed consumer");
+
+ _logger.info("closing session");
+ _session.close();
+ _logger.info("closed session");
+
+ _session = conn.createSession(true, ACK_MODE);
+
+ _consumer = _session.createConsumer(_jmsQueue);
+
+ _logger.info("getting all messages");
+
+ Set<String> results = new HashSet<String>();
+ for (int count = 0; count < MESSAGE_TO_SEND; count++)
+ {
+ TextMessage msg = ((TextMessage) _consumer.receive(1000));
+
+ assertNotNull("Message should not be null, count:" + count, msg);
+ String txt = msg.getText();
+ _logger.trace("Received msg:" + txt + ":" + ((AMQMessage) msg).getDeliveryTag());
+ results.add(txt);
+ }
+
+
+ Message result = _consumer.receive(1000);
+ assertNull("test message should be null:" + result, result);
+
+ assertEquals("All messages not received", MESSAGE_TO_SEND, results.size());
+
+ _session.commit();
+
+
+ }
}
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index f5d51b9826..c01a31a1bd 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -151,4 +151,9 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
{
//no-op
}
+
+ public Object sendlock()
+ {
+ return null;
+ }
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 27f9802fb5..903e74c1a1 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -121,6 +121,11 @@ public class SubscriptionTestHelper implements Subscription
//no-op
}
+ public Object sendlock()
+ {
+ return null;
+ }
+
public int hashCode()
{
return key.hashCode();
diff --git a/qpid/specs/amqp.0-8.xml b/qpid/specs/amqp.0-8.xml
index b84751c398..7b826a796c 100644
--- a/qpid/specs/amqp.0-8.xml
+++ b/qpid/specs/amqp.0-8.xml
@@ -169,6 +169,8 @@ Revision history:
unacknowledged messages on a channel.
2006-07-03 (PH) - cosmetic clean-up of Basic.Recover comments.
+
+ 2006-07-09 (MR) - added Basic.RecoverOk so we know when the recover has been done.
-->
<amqp major="8" minor="0" port="5672" comment="AMQ protocol 0.80">
@@ -2521,7 +2523,16 @@ localised reply text
The server MUST raise a channel exception if this is called on a
transacted channel.
</doc>
-</method>
+ <response name="rollback-ok"/>
+ </method>
+ <method name="recover-ok" synchronous="1" index="101">
+ confirm a successful recover
+ <doc>
+ This method confirms to the client that the recover succeeded.
+ Note that if an recover fails, the server raises a channel exception.
+ </doc>
+ <chassis name="client" implement="MUST"/>
+ </method>
</class>