diff options
17 files changed, 473 insertions, 179 deletions
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml index 28a572eac9..810b65798e 100644 --- a/qpid/java/broker/etc/log4j.xml +++ b/qpid/java/broker/etc/log4j.xml @@ -46,6 +46,11 @@ <priority value="info"/> </category> + <category name="org.apache.qpid.framing.AMQDataBlockEncoder"> + <priority value="info"/> + </category> + + <category name="org.apache.qpid"> <priority value="warn"/> </category> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a85db9f26b..e1b6497062 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -308,6 +308,10 @@ public class AMQChannel public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException { + if (_log.isTraceEnabled()) + { + _log.trace("Unsubscribed consumer:" + consumerTag); + } AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); if (q != null) { @@ -350,9 +354,17 @@ public class AMQChannel * @param message * @param deliveryTag * @param queue + * @param consumerTag */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { + if (_log.isTraceEnabled()) + { + _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId + + " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag + + " from queue:" + queue.getName()); + } + synchronized (_unacknowledgedMessageMapLock) { _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); @@ -364,6 +376,8 @@ public class AMQChannel /** * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to * this same channel or to other subscribers. + * + * @throws org.apache.qpid.AMQException if delivery failes */ public void requeue() throws AMQException { @@ -372,7 +386,12 @@ public class AMQChannel synchronized (_unacknowledgedMessageMapLock) { currentList = _unacknowledgedMessageMap; - _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); + _unacknowledgedMessageMap = newUnacknowledgedMap(); + } + + if (_log.isDebugEnabled()) + { + _log.debug("Requeuing " + currentList.size() + " messages for channel:" + System.identityHashCode(this)); } for (UnacknowledgedMessage unacked : currentList.values()) @@ -391,62 +410,61 @@ public class AMQChannel /** Called to resend all outstanding unacknowledged messages to this same channel. */ public void resend() throws AMQException { - //messages go to this channel + Map<Long, UnacknowledgedMessage> currentList; + synchronized (_unacknowledgedMessageMapLock) { - Iterator<Map.Entry<Long, UnacknowledgedMessage>> messageSetIterator = - _unacknowledgedMessageMap.entrySet().iterator(); - - while (messageSetIterator.hasNext()) - { - Map.Entry<Long, UnacknowledgedMessage> entry = messageSetIterator.next(); + currentList = _unacknowledgedMessageMap; + _unacknowledgedMessageMap = newUnacknowledgedMap(); + } - //long deliveryTag = entry.getKey(); - String consumerTag = entry.getValue().consumerTag; + for (Map.Entry<Long, UnacknowledgedMessage> entry : currentList.entrySet()) + { + UnacknowledgedMessage unacked = entry.getValue(); - if (_consumerTag2QueueMap.containsKey(consumerTag)) - { - AMQMessage msg = entry.getValue().message; - msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(); + String consumerTag = unacked.consumerTag; - if (sub != null) - { - if (_log.isDebugEnabled()) - { - _log.debug("Requeuing " + msg + " for resend"); - } + if (_consumerTag2QueueMap.containsKey(consumerTag)) + { + AMQMessage msg = entry.getValue().message; + msg.setRedelivered(true); + Subscription sub = msg.getDeliveredSubscription(); - sub.addToResendQueue(msg); - } - else + if (sub != null) + { + if (_log.isDebugEnabled()) { - _log.error("DeliveredSubscription not recorded"); + _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend"); } - // Don't write the frame as the DeliveryManager can now deal with it - //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); + sub.addToResendQueue(msg); } else - { // The current consumer has gone so we need to requeue + { + _log.error("DeliveredSubscription not recorded"); + } - UnacknowledgedMessage unacked = entry.getValue(); + // Don't write the frame as the DeliveryManager can now deal with it + //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); + } + else + { // The current consumer has gone so we need to requeue - if (unacked.queue != null) - { - unacked.message.setTxnBuffer(null); + if (unacked.queue != null) + { + unacked.message.setTxnBuffer(null); - unacked.message.release(); + unacked.message.release(); - unacked.queue.deliver(unacked.message); - } - // delete the requeued message. - messageSetIterator.remove(); + unacked.queue.deliver(unacked.message); } } } + } - //fixme need to start the async delivery here. + private Map<Long, UnacknowledgedMessage> newUnacknowledgedMap() + { + return new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); } /** @@ -541,9 +559,9 @@ public class AMQChannel private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException { - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag + + _log.trace("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag + " and multiple " + multiple); } if (multiple) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index f83d38ad47..7c8a91870c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; @@ -49,11 +50,6 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId()); AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); - if (channel == null) - { - throw new AMQException("Unknown channel " + evt.getChannelId()); - } - if (evt.getMethod().getRequeue()) { //fixme need tests to exercise @@ -63,5 +59,11 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic { channel.resend(); } + + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(BasicRecoverOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 1d11f6297b..f09142b4cb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -54,14 +54,14 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod channel.rollback(); + //The DeliveryManager will now be responsible for dispatching these messages. + // So call resend to put them on the correct resend queue. + channel.resend(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); - - //Now resend all the unacknowledged messages back to the original subscribers. - //(Must be done after the TxnRollback-ok response). - channel.resend(); } catch (AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 89807fb043..20d706b2c6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -136,7 +136,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (RuntimeException e) { e.printStackTrace(); - // throw e; + // throw e; } } @@ -183,12 +183,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String locales = "en_US"; // Interfacing with generated code - be aware of possible changes to parameter order as versions change. AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, - _major, _minor, // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short)_major, // versionMajor - (short)_minor); // versionMinor + _major, _minor, // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short) _major, // versionMajor + (short) _minor); // versionMinor _minaProtocolSession.write(response); } catch (AMQException e) @@ -307,8 +307,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -335,14 +335,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public AMQChannel getChannel(int channelId) throws AMQException { - return _channelMap.get(channelId); + AMQChannel channel = _channelMap.get(channelId); + + if (channel == null) + { + throw new AMQException("Unknown channel " + channelId); + } + else + { + return channel; + } } public void addChannel(AMQChannel channel) throws AMQException { if (_closed) { - throw new AMQException("Session is closed"); + throw new AMQException("Session is closed"); } _channelMap.put(channel.getChannelId(), channel); @@ -385,12 +394,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } /** - * Close a specific channel. This will remove any resources used by the channel, including: - * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li> - * </ul> + * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue + * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> * * @param channelId id of the channel to close - * @throws AMQException if an error occurs closing the channel + * + * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid */ public void closeChannel(int channelId) throws AMQException @@ -438,8 +447,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } /** - * Closes all channels that were opened by this protocol session. This frees up all resources - * used by the channel. + * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. * * @throws AMQException if an error occurs while closing any channel */ @@ -452,10 +460,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _channelMap.clear(); } - /** - * This must be called when the session is _closed in order to free up any resources - * managed by the session. - */ + /** This must be called when the session is _closed in order to free up any resources managed by the session. */ public void closeSession() throws AMQException { if (!_closed) @@ -479,17 +484,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; } - /** - * @return an object that can be used to identity - */ + /** @return an object that can be used to identity */ public Object getKey() { return _minaProtocolSession.getRemoteAddress(); } /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers - * may be bound to multiple addresses this could vary depending on the acceptor this session was created from. + * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may + * be bound to multiple addresses this could vary depending on the acceptor this session was created from. * * @return a String FQDN */ @@ -533,8 +536,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } /** - * Convenience methods for managing AMQP version. - * NOTE: Both major and minor will be set to 0 prior to protocol initiation. + * Convenience methods for managing AMQP version. NOTE: Both major and minor will be set to 0 prior to protocol + * initiation. */ public byte getAmqpMajor() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index bcad8e7d14..99bf9ca31d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -343,7 +343,7 @@ public class AMQQueue implements Managable, Comparable { debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, _deliveryMgr); + Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); if (subscription.hasFilters()) { @@ -364,14 +364,15 @@ public class AMQQueue implements Managable, Comparable Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, - consumerTag, - _deliveryMgr))) + consumerTag))) == null) { throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag + " and protocol session key " + ps.getKey() + " not registered with queue " + this); } + removedSubscription.close(); + // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { @@ -543,6 +544,10 @@ public class AMQQueue implements Managable, Comparable _maximumMessageAge = maximumMessageAge; } + public void setQueueHasContent(boolean b, SubscriptionImpl subscription) + { + _deliveryMgr.setQueueHasContent(b, subscription); + } private class Deliver implements TxnOp { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index f66604a5c1..1a26bab011 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -31,17 +31,13 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Queue; -import java.util.HashMap; -import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicInteger; /** Manages delivery of messages on behalf of a queue */ @@ -154,13 +150,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); } - public void setQueueHasContent(Subscription subscription) + public void setQueueHasContent(boolean hasContent, Subscription subscription) { _lock.lock(); try { - _log.debug("Queue has content Set"); + _log.trace("Queue has content Set"); _hasContent.add(subscription); } finally @@ -236,8 +232,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (messageQueue == null) { - // There is no queue with messages currently - _log.warn(sub + ": asked to send messages but has none on given queue:" + queue); + // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector + if (_log.isDebugEnabled()) + { + _log.debug(sub + ": asked to send messages but has none on given queue:" + queue); + } return; } AMQMessage message = null; @@ -252,7 +251,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message:" + message + " to :" + this); + _log.debug("Async Delivery Message (" + System.identityHashCode(message) + ") to :" + System.identityHashCode(this)); } sub.send(message, queue); @@ -266,6 +265,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (messageQueue == sub.getResendQueue()) { + if (_log.isTraceEnabled()) + { + _log.trace("All messages sent from resendQueue for " + sub); + } + _hasContent.remove(sub); } else if (messageQueue == sub.getPreDeliveryQueue()) @@ -299,11 +303,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager for (Subscription sub : _subscriptions.getSubscriptions()) { - if (!sub.isSuspended()) + // Ensure only we are processing the subscribers. getNextQueue as if the subscriber has a resend queue + // they may close and start to empty it themselves. + synchronized (sub.sendlock()) { - sendNextMessage(sub, _queue); + if (!sub.isSuspended()) + { + sendNextMessage(sub, _queue); - hasSubscribers = true; + hasSubscribers = true; + } } } } @@ -316,9 +325,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void deliver(String name, AMQMessage msg) throws FailedDequeueException { - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug(id() + "deliver :" + System.identityHashCode(msg)); + _log.trace(id() + "deliver :" + System.identityHashCode(msg)); } //Check if we have someone to deliver the message to. @@ -436,7 +445,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") hasContent:" - + _hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() + + + !_hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() + " Processing:" + _processing.get()); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index e4242f497a..2b9ea5e866 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -81,5 +81,5 @@ interface DeliveryManager long getOldestMessageArrival(); - void setQueueHasContent(Subscription subscription); + void setQueueHasContent(boolean hasContent, Subscription subscription); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index 30b446c309..fe75d25241 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -51,4 +51,6 @@ public interface Subscription Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages); void addToResendQueue(AMQMessage msg); + + Object sendlock(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index ba31ca19b5..52c728a7dd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -34,9 +34,8 @@ import org.apache.qpid.framing.FieldTable; public interface SubscriptionFactory { Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, - FieldTable filters, boolean noLocal, DeliveryManager deliveryManager) throws AMQException; + FieldTable filters, boolean noLocal, AMQQueue queue) throws AMQException; - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, - DeliveryManager deliveryManager) throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index d43e20eb89..a53e305e49 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; /** * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag @@ -67,35 +68,35 @@ public class SubscriptionImpl implements Subscription private final Boolean _autoClose; private boolean _closed = false; - private DeliveryManager _deliveryManager; + private AMQQueue _queue; + private final AtomicBoolean _resending = new AtomicBoolean(false); public static class Factory implements SubscriptionFactory { public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal, - DeliveryManager deliveryManager) throws AMQException + AMQQueue queue) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, deliveryManager); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, queue); } - public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, - DeliveryManager deliveryManager) + public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, deliveryManager); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, null); } } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks, DeliveryManager deliveryManager) + String consumerTag, boolean acks, AMQQueue queue) throws AMQException { - this(channelId, protocolSession, consumerTag, acks, null, false, deliveryManager); + this(channelId, protocolSession, consumerTag, acks, null, false, queue); } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal, - DeliveryManager deliveryManager) + AMQQueue queue) throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); @@ -110,7 +111,7 @@ public class SubscriptionImpl implements Subscription sessionKey = protocolSession.getKey(); _acks = acks; _noLocal = noLocal; - _deliveryManager = deliveryManager; + _queue = queue; _filters = FilterManagerFactory.createManager(filters); @@ -238,9 +239,12 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + //fixme what is wrong with this? + //AMQDataBlock frame = msg.getDataBlock(channel.getChannelId(),consumerTag,deliveryTag); + protocolSession.writeFrame(frame); } } @@ -271,9 +275,12 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + //fixme what is wrong with this? + //AMQDataBlock frame = msg.getDataBlock(channel.getChannelId(),consumerTag,deliveryTag); + protocolSession.writeFrame(frame); } } @@ -285,7 +292,7 @@ public class SubscriptionImpl implements Subscription public boolean isSuspended() { - return channel.isSuspended(); + return channel.isSuspended() && !_resending.get(); } /** @@ -379,12 +386,20 @@ public class SubscriptionImpl implements Subscription public void close() { + _logger.info("Closing subscription:" + this); + if (_resendQueue != null && !_resendQueue.isEmpty()) { requeue(); } - if (!_closed) + //remove references in PDQ + if (_messages != null) + { + _messages.clear(); + } + + if (_autoClose && !_closed) { _logger.info("Closing autoclose subscription:" + this); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -400,8 +415,59 @@ public class SubscriptionImpl implements Subscription private void requeue() { - //fixme - _logger.error("MESSAGES LOST as subscription hasn't yet resent all its requeued messages"); + + if (_queue != null) + { + _logger.trace("Requeuing :" + _resendQueue.size() + " messages"); + + //Take control over to this thread for delivering messages from the Async Delivery. + setResending(true); + + while (!_resendQueue.isEmpty()) + { + AMQMessage resent = _resendQueue.poll(); + + resent.setTxnBuffer(null); + + resent.release(); + + try + { + _queue.deliver(resent); + } + catch (AMQException e) + { + _logger.error("Unable to re-deliver messages", e); + } + } + + setResending(false); + + if (!_resendQueue.isEmpty()) + { + _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null."); + } + + _queue.setQueueHasContent(false, this); + } + else + { + if (!_resendQueue.isEmpty()) + { + _logger.error("Unable to re-deliver messages as queue is null."); + } + } + + // Clear the messages + _resendQueue = null; + } + + private void setResending(boolean resending) + { + synchronized (_resending) + { + _resending.set(resending); + } } public boolean isBrowser() @@ -450,17 +516,22 @@ public class SubscriptionImpl implements Subscription getResendQueue().add(msg); // Mark Queue has having content. - if (_deliveryManager == null) + if (_queue == null) { _logger.error("Delivery Manager is null won't be able to resend messages"); } else { - _deliveryManager.setQueueHasContent(this); + _queue.setQueueHasContent(true, this); } } - private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) + public Object sendlock() + { + return _resending; + } + + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange, boolean redelivered) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -470,7 +541,7 @@ public class SubscriptionImpl implements Subscription consumerTag, // consumerTag deliveryTag, // deliveryTag exchange, // exchange - false, // redelivered + redelivered, // redelivered routingKey // routingKey ); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 91e720ea54..c4dab50ff4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -27,27 +27,18 @@ import java.util.List; import java.util.ListIterator; import java.util.concurrent.CopyOnWriteArrayList; -/** - * Holds a set of subscriptions for a queue and manages the round - * robin-ing of deliver etc. - */ +/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */ class SubscriptionSet implements WeightedSubscriptionManager { private static final Logger _log = Logger.getLogger(SubscriptionSet.class); - /** - * List of registered subscribers - */ + /** List of registered subscribers */ private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>(); - /** - * Used to control the round robin delivery of content - */ + /** Used to control the round robin delivery of content */ private int _currentSubscriber; - /** - * Accessor for unit tests. - */ + /** Accessor for unit tests. */ int getCurrentSubscriber() { return _currentSubscriber; @@ -62,14 +53,19 @@ class SubscriptionSet implements WeightedSubscriptionManager * Remove the subscription, returning it if it was found * * @param subscription + * * @return null if no match was found */ public Subscription removeSubscriber(Subscription subscription) { - boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here. - if (isRemoved) + // TODO: possibly need O(1) operation here. + int subIndex = _subscriptions.indexOf(subscription); + + if (subIndex != -1) { - return subscription; + //we can't just return the passed in subscription as it is a new object + // and doesn't contain the stored state we need. + return _subscriptions.remove(subIndex); } else { @@ -92,14 +88,11 @@ class SubscriptionSet implements WeightedSubscriptionManager } /** - * Return the next unsuspended subscription or null if not found. - * <p/> - * Performance note: - * This method can scan all items twice when looking for a subscription that is not - * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this - * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of - * race conditions and when subscriptions are removed between calls to nextSubscriber, the - * IndexOutOfBoundsException also causes the scan to start at the beginning. + * Return the next unsuspended subscription or null if not found. <p/> Performance note: This method can scan all + * items twice when looking for a subscription that is not suspended. The worst case occcurs when all subscriptions + * are suspended. However, it is does this without synchronisation and subscriptions may be added and removed + * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to + * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning. */ public Subscription nextSubscriber(AMQMessage msg) { @@ -156,9 +149,7 @@ class SubscriptionSet implements WeightedSubscriptionManager return null; } - /** - * Overridden in test classes. - */ + /** Overridden in test classes. */ protected void subscriberScanned() { } @@ -199,8 +190,8 @@ class SubscriptionSet implements WeightedSubscriptionManager } /** - * Notification that a queue has been deleted. This is called so that the subscription can inform the - * channel, which in turn can update its list of unacknowledged messages. + * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which + * in turn can update its list of unacknowledged messages. * * @param queue */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index eb29d9d805..8dce5d4494 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -805,42 +805,57 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // this is set only here, and the before the consumer's onMessage is called it is set to false _inRecovery = true; - boolean isSuspended = isSuspended(); - - if (!isSuspended) + try { - try + + boolean isSuspended = isSuspended(); + + if (!isSuspended) { - suspendChannel(true); + try + { + suspendChannel(true); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } - catch (AMQException e) + for (BasicMessageConsumer consumer : _consumers.values()) { - throw new JMSAMQException(e); + consumer.clearUnackedMessages(); } - } - for (BasicMessageConsumer consumer : _consumers.values()) - { - consumer.clearUnackedMessages(); - } - - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) - false)); // requeue - if (!isSuspended) - { - try + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + false) // requeue + , BasicRecoverOkBody.class); + + + if (_dispatcher != null) { - suspendChannel(false); + _dispatcher.rollback(); } - catch (AMQException e) + + if (!isSuspended) { - throw new JMSAMQException(e); + try + { + suspendChannel(false); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } @@ -1873,9 +1888,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void confirmConsumerCancelled(String consumerTag) { BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if ((consumer != null) && (consumer.isAutoClose())) + if (consumer != null) { - consumer.closeWhenNoMessages(true); + if (consumer.isAutoClose()) + { + consumer.closeWhenNoMessages(true); + } + //fixme seems abit like a hack +// else +// { +// consumer.rollback(); +// } } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index c5ac530297..2bec9dbcdb 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -22,8 +22,11 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.message.AMQMessage; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.TxRollbackBody; +import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.url.URLSyntaxException; import org.apache.log4j.Logger; @@ -34,6 +37,8 @@ import javax.jms.Queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; +import java.util.Set; +import java.util.HashSet; /** * This class tests a number of commits and roll back scenarios @@ -52,6 +57,7 @@ public class CommitRollbackTest extends TestCase Queue _jmsQueue; private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class); + private final int ACK_MODE = Session.CLIENT_ACKNOWLEDGE; protected void setUp() throws Exception { @@ -64,12 +70,12 @@ public class CommitRollbackTest extends TestCase { conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"); - _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + _session = conn.createSession(true, ACK_MODE); _jmsQueue = _session.createQueue(queue); _consumer = _session.createConsumer(_jmsQueue); - _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + _pubSession = conn.createSession(true, ACK_MODE); _publisher = _pubSession.createProducer(_pubSession.createQueue(queue)); @@ -129,6 +135,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); + assertTrue("Redelivered not true", result.getJMSRedelivered()); assertNull("test message was put and rolled back, but is still present", result); } @@ -247,7 +254,7 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _pubSession.getTransacted()); _logger.info("sending test message"); - String MESSAGE_TEXT = "testGetThenDisconnect"; + String MESSAGE_TEXT = "testGetThenRollback"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); _pubSession.commit(); @@ -258,6 +265,7 @@ public class CommitRollbackTest extends TestCase assertNotNull("retrieved message is null", msg); assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + assertTrue("Message redelivered not set", !msg.getJMSRedelivered()); _logger.info("rolling back"); @@ -270,6 +278,7 @@ public class CommitRollbackTest extends TestCase _session.commit(); assertNotNull("test message was consumed and rolled back, but is gone", result); assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Message redelivered not set", result.getJMSRedelivered()); } @@ -296,6 +305,142 @@ public class CommitRollbackTest extends TestCase Message result = _consumer.receive(1000); assertNull("test message should be null", result); + + _session.commit(); } + /** + * Test that Closing a consumer and then connection while messags are being resent from a rolling back get correctly + * requeued a session purges the dispatcher queue, and the messages arrive in the correct order + * + * @throws Exception if something goes wrong + */ + public void testRollbackWithConsumerConnectionClose() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + + int MESSAGE_TO_SEND = 1000; + + for (int count = 0; count < MESSAGE_TO_SEND; count++) + { + _publisher.send(_pubSession.createTextMessage(String.valueOf(count))); + } + + _pubSession.commit(); + + _logger.info("getting a few messages"); + + for (int count = 0; count < MESSAGE_TO_SEND / 2; count++) + { + assertEquals(String.valueOf(count), ((TextMessage) _consumer.receive(1000)).getText()); + } + + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("closing consumer"); + _consumer.close(); + _logger.info("closed consumer"); + + _logger.info("close connection"); + conn.close(); + _logger.info("closed connection"); + + newConnection(); + + _logger.info("getting all messages"); + + Set<String> results = new HashSet<String>(); + for (int count = 0; count < MESSAGE_TO_SEND; count++) + { + TextMessage msg = ((TextMessage) _consumer.receive(1000)); + + assertNotNull("Message should not be null, count:" + count, msg); + String txt = msg.getText(); + _logger.trace("Received msg:" + txt + ":" + ((AMQMessage) msg).getDeliveryTag()); + results.add(txt); + } + + + Message result = _consumer.receive(1000); + assertNull("test message should be null", result); + + assertEquals("All messages not received", MESSAGE_TO_SEND, results.size()); + + _session.commit(); + } + + + /** + * Test that Closing a consumer and then session while messags are being resent from a rollback get correctly + * requeued, a session purges the dispatcher queue, and the messages arrive in the correct order + * + * @throws Exception if something goes wrong + */ + public void testRollbackWithConsumerAndSessionClose() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + + int MESSAGE_TO_SEND = 1000; + + for (int count = 0; count < MESSAGE_TO_SEND; count++) + { + _publisher.send(_pubSession.createTextMessage(String.valueOf(count))); + } + + _pubSession.commit(); + + _logger.info("getting a few messages"); + + for (int count = 0; count < MESSAGE_TO_SEND / 2; count++) + { + assertEquals(String.valueOf(count), ((TextMessage) _consumer.receive(1000)).getText()); + } + + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("closing consumer"); + _consumer.close(); + _logger.info("closed consumer"); + + _logger.info("closing session"); + _session.close(); + _logger.info("closed session"); + + _session = conn.createSession(true, ACK_MODE); + + _consumer = _session.createConsumer(_jmsQueue); + + _logger.info("getting all messages"); + + Set<String> results = new HashSet<String>(); + for (int count = 0; count < MESSAGE_TO_SEND; count++) + { + TextMessage msg = ((TextMessage) _consumer.receive(1000)); + + assertNotNull("Message should not be null, count:" + count, msg); + String txt = msg.getText(); + _logger.trace("Received msg:" + txt + ":" + ((AMQMessage) msg).getDeliveryTag()); + results.add(txt); + } + + + Message result = _consumer.receive(1000); + assertNull("test message should be null:" + result, result); + + assertEquals("All messages not received", MESSAGE_TO_SEND, results.size()); + + _session.commit(); + + + } } diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index f5d51b9826..c01a31a1bd 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -151,4 +151,9 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage { //no-op } + + public Object sendlock() + { + return null; + } } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 27f9802fb5..903e74c1a1 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -121,6 +121,11 @@ public class SubscriptionTestHelper implements Subscription //no-op } + public Object sendlock() + { + return null; + } + public int hashCode() { return key.hashCode(); diff --git a/qpid/specs/amqp.0-8.xml b/qpid/specs/amqp.0-8.xml index b84751c398..7b826a796c 100644 --- a/qpid/specs/amqp.0-8.xml +++ b/qpid/specs/amqp.0-8.xml @@ -169,6 +169,8 @@ Revision history: unacknowledged messages on a channel. 2006-07-03 (PH) - cosmetic clean-up of Basic.Recover comments. + + 2006-07-09 (MR) - added Basic.RecoverOk so we know when the recover has been done. --> <amqp major="8" minor="0" port="5672" comment="AMQ protocol 0.80"> @@ -2521,7 +2523,16 @@ localised reply text The server MUST raise a channel exception if this is called on a transacted channel. </doc> -</method> + <response name="rollback-ok"/> + </method> + <method name="recover-ok" synchronous="1" index="101"> + confirm a successful recover + <doc> + This method confirms to the client that the recover succeeded. + Note that if an recover fails, the server raises a channel exception. + </doc> + <chassis name="client" implement="MUST"/> + </method> </class> |