diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-03 19:48:46 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-03 19:48:46 +0000 |
commit | 02bbab932f5f845bfa8eac6069bc4159bbe53d07 (patch) | |
tree | 7a31c2804f9b9fa5f1dbabc80cbe219d9e735890 /java | |
parent | 7b0c33ff443deb937d26f07c039bd483e9bcbe29 (diff) | |
download | qpid-python-02bbab932f5f845bfa8eac6069bc4159bbe53d07.tar.gz |
QPID-3720 : [Java Broker] Implement Message Grouping
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1226930 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
14 files changed, 674 insertions, 55 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a1f1c037ec..32d9c4878a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -149,7 +149,13 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer void removeMessagesFromQueue(long fromMessageId, long toMessageId); - + static interface Visitor + { + boolean visit(QueueEntry entry); + } + + void visit(Visitor visitor); + long getMaximumMessageSize(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 37fad54c07..142cfddb39 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -75,6 +75,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.AVAILABLE; } + + public String toString() + { + return getState().name(); + } } @@ -85,6 +90,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.DEQUEUED; } + + public String toString() + { + return getState().name(); + } } @@ -95,6 +105,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.DELETED; } + + public String toString() + { + return getState().name(); + } } public final class ExpiredState extends EntryState @@ -104,6 +119,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.EXPIRED; } + + public String toString() + { + return getState().name(); + } } @@ -113,6 +133,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.ACQUIRED; } + + public String toString() + { + return getState().name(); + } } public final class SubscriptionAcquiredState extends EntryState @@ -134,6 +159,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return _subscription; } + + public String toString() + { + return "{" + getState().name() + " : " + _subscription +"}"; + } } public final class SubscriptionAssignedState extends EntryState @@ -155,6 +185,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return _subscription; } + + + public String toString() + { + return "{" + getState().name() + " : " + _subscription +"}"; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 77c4b912e0..641aaa0a08 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -35,4 +35,6 @@ public interface QueueEntryList<Q extends QueueEntry> Q getHead(); void entryDeleted(Q queueEntry); + + int getPriorities(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index ebed781a1a..d48445930a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -25,7 +25,6 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; @@ -33,7 +32,6 @@ import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.QueueConfigType; import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -45,6 +43,7 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.subscription.MessageGroupManager; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -68,11 +67,11 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); + private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; private final VirtualHost _virtualHost; @@ -189,6 +188,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); + private final MessageGroupManager _messageGroupManager; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { @@ -242,25 +242,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _logSubject = new QueueLogSubject(this); _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); - // Log the correct creation message - - // Extract the number of priorities for this Queue. - // Leave it as 0 if we are a SimpleQueueEntryList - int priorities = 0; - if (entryListFactory instanceof PriorityQueueList.Factory) - { - priorities = ((PriorityQueueList)_entries).getPriorities(); - } - // Log the creation of this Queue. // The priorities display is toggled on if we set priorities > 0 CurrentActor.get().message(_logSubject, QueueMessages.CREATED(String.valueOf(_owner), - priorities, - _owner != null, - autoDelete, - durable, !durable, - priorities > 0)); + _entries.getPriorities(), + _owner != null, + autoDelete, + durable, !durable, + _entries.getPriorities() > 0)); getConfigStore().addConfiguredObject(this); @@ -274,6 +264,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _logger.error("AMQQueue MBean creation has failed ", e); } + if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY)) + { + _messageGroupManager = new MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255); + } + else + { + _messageGroupManager = null; + } + resetNotifications(); } @@ -488,6 +487,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setExclusiveSubscriber(null); subscription.setQueueContext(null); + if(_messageGroupManager != null) + { + QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); + _messageGroupManager.clearAssignments(subscription); + + if(entry != null) + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance()) + { + Subscription sub = subscriberIter.getNode().getSubscription(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues()) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + + } + + } + // auto-delete queues must be deleted if there are no remaining subscribers if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) @@ -691,21 +716,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { try { - if (subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended()) + if (!sub.isSuspended() + && subscriptionReadyAndHasInterest(sub, entry) + && mightAssign(sub, entry) + && !sub.wouldSuspend(entry)) { - if (!sub.wouldSuspend(entry)) + if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub))) { - if (sub.acquires() && !entry.acquire(sub)) - { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription - sub.restoreCredit(entry); - } - else - { - deliverMessage(sub, entry, false); - } + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this subscription + sub.restoreCredit(entry); + } + else + { + deliverMessage(sub, entry, false); } } } @@ -716,6 +740,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + private boolean assign(final Subscription sub, final QueueEntry entry) + { + return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry); + } + + + private boolean mightAssign(final Subscription sub, final QueueEntry entry) + { + if(_messageGroupManager == null || !sub.acquires()) + return true; + Subscription assigned = _messageGroupManager.getAssignedSubscription(entry); + return (assigned == null) || (assigned == sub); + } + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) { // This method is only required for queues which mess with ordering @@ -1020,6 +1058,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean filterComplete(); } + + public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) { return getMessagesOnTheQueue(new QueueEntryFilter() @@ -1074,6 +1114,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void visit(final Visitor visitor) + { + QueueEntryIterator queueListIterator = _entries.iterator(); + + while(queueListIterator.advance()) + { + QueueEntry node = queueListIterator.getNode(); + + if(!node.isDispensed()) + { + if(visitor.visit(node)) + { + break; + } + } + } + } + /** * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. * @@ -1708,11 +1766,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (node != null && node.isAvailable()) { - if (sub.hasInterest(node)) + if (sub.hasInterest(node) && mightAssign(sub, node)) { if (!sub.wouldSuspend(node)) { - if (sub.acquires() && !node.acquire(sub)) + if (sub.acquires() && !(assign(sub, node) && node.acquire(sub))) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription @@ -1769,7 +1827,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; - while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node))) + while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) || + !mightAssign(sub,node))) { if (expired) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 0bb5dcc219..b40e5a28c2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -185,6 +185,11 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl advanceHead(); } + public int getPriorities() + { + return 0; + } + static class Factory implements QueueEntryListFactory { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 5f8ab16c06..414a123c43 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -51,13 +51,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl _propertyName = propertyName; } - @Override public AMQQueue getQueue() { return _queue; } - @Override public SortedQueueEntryImpl add(final ServerMessage message) { synchronized(_lock) @@ -286,7 +284,6 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl return (node == null ? Colour.BLACK : node.getColour()) == colour; } - @Override public SortedQueueEntryImpl next(final SortedQueueEntryImpl node) { synchronized(_lock) @@ -316,13 +313,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } - @Override public QueueEntryIterator<SortedQueueEntryImpl> iterator() { return new QueueEntryIteratorImpl(_head); } - @Override public SortedQueueEntryImpl getHead() { return _head; @@ -333,7 +328,6 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl return _root; } - @Override public void entryDeleted(final SortedQueueEntryImpl entry) { synchronized(_lock) @@ -431,6 +425,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } + public int getPriorities() + { + return 0; + } + /** * Swaps the position of the node in the tree with it's successor * (that is the node with the next highest key) diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java new file mode 100644 index 0000000000..1999d655c9 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; + + +public class MessageGroupManager +{ + private static final Logger _logger = LoggerFactory.getLogger(MessageGroupManager.class); + + + private final String _groupId; + private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>(); + private final int _groupMask; + + public MessageGroupManager(final String groupId, final int maxGroups) + { + _groupId = groupId; + _groupMask = pow2(maxGroups)-1; + } + + private static int pow2(final int i) + { + int val = 1; + while(val < i) val<<=1; + return val; + } + + public Subscription getAssignedSubscription(final QueueEntry entry) + { + Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); + return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask); + } + + public boolean acceptMessage(Subscription sub, QueueEntry entry) + { + Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); + if(groupVal == null) + { + return true; + } + else + { + Integer group = groupVal.hashCode() & _groupMask; + Subscription assignedSub = _groupMap.get(group); + if(assignedSub == sub) + { + return true; + } + else + { + if(assignedSub == null) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Assigning group " + groupVal + " to sub " + sub); + } + assignedSub = _groupMap.putIfAbsent(group, sub); + return assignedSub == null || assignedSub == sub; + } + else + { + return false; + } + } + } + } + + public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub) + { + EntryFinder visitor = new EntryFinder(sub); + sub.getQueue().visit(visitor); + return visitor.getEntry(); + } + + private class EntryFinder implements AMQQueue.Visitor + { + private QueueEntry _entry; + private Subscription _sub; + + public EntryFinder(final Subscription sub) + { + _sub = sub; + } + + public boolean visit(final QueueEntry entry) + { + if(!entry.isAvailable()) + return false; + + Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId); + if(groupId == null) + return false; + + Integer group = groupId.hashCode() & _groupMask; + Subscription assignedSub = _groupMap.get(group); + if(assignedSub == _sub) + { + _entry = entry; + return true; + } + else + { + return false; + } + } + + public QueueEntry getEntry() + { + return _entry; + } + } + + public void clearAssignments(Subscription sub) + { + Iterator<Subscription> subIter = _groupMap.values().iterator(); + while(subIter.hasNext()) + { + if(subIter.next() == sub) + { + subIter.remove(); + } + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 8b029f9a51..f97ac5659e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -601,20 +601,20 @@ public class MockAMQQueue implements AMQQueue } - @Override public int getMaximumDeliveryCount() { return 0; } - @Override public void setMaximumDeliveryCount(int maximumDeliveryCount) { } - @Override public void setAlternateExchange(String exchangeName) { } + public void visit(final Visitor visitor) + { + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8984b7ca8c..4b6015945e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1583,6 +1583,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _prefetchLowMark; } + public int getPrefetch() + { + return _prefetchHighMark; + } + public AMQShortString getDefaultQueueExchangeName() { return _connection.getDefaultQueueExchangeName(); @@ -3047,7 +3052,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public boolean prefetch() { - return getAMQConnection().getMaxPrefetch() > 0; + return _prefetchHighMark > 0; } /** Signifies that the session has pending sends to commit. */ diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index a49e31ce8c..3b6179dd07 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -545,7 +545,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } else if (getSession().prefetch()) { - capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + capacity = getSession().getPrefetch(); } return capacity; } diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index f53fa8d83c..1889577773 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -588,7 +588,7 @@ public class AMQSession_0_10Test extends TestCase } boolean isTransacted = acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED ? true : false; AMQSession_0_10 session = new AMQSession_0_10(createConnection(throwException), amqConnection, 1, isTransacted, acknowledgeMode, - 1, 1, "test"); + 0, 0, "test"); return session; } @@ -600,7 +600,6 @@ public class AMQSession_0_10Test extends TestCase connection.setSessionFactory(new SessionFactory() { - @Override public Session newSession(Connection conn, Binary name, long expiry) { return new MockSession(conn, new SessionDelegate(), name, expiry, throwException); @@ -611,7 +610,6 @@ public class AMQSession_0_10Test extends TestCase private final class MockMessageListener implements MessageListener { - @Override public void onMessage(Message arg0) { } @@ -710,23 +708,19 @@ public class AMQSession_0_10Test extends TestCase { private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>(); - @Override public void setIdleTimeout(int i) { } - @Override public void send(ProtocolEvent msg) { _sendEvents.add(msg); } - @Override public void flush() { } - @Override public void close() { } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index 84e4056f4d..f64164c10b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -166,6 +166,11 @@ public abstract class AMQTypedValue private static final class IntTypedValue extends AMQTypedValue { + @Override + public String toString() + { + return "[INT: " + String.valueOf(_value) + "]"; + } private final int _value; diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index d391181217..5a9ea73cae 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -44,6 +44,7 @@ import static org.apache.qpid.util.Serial.max; import static org.apache.qpid.util.Strings.toUTF8; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -828,8 +829,17 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { - checkFailoverRequired("Session sync was interrupted by failover."); - log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, Arrays.asList(commands)); + checkFailoverRequired("Session sync was interrupted by failover."); + if(log.isDebugEnabled()) + { + List<Method> waitingFor = + Arrays.asList(commands) + .subList(mod(maxComplete,commands.length), + mod(commandsOut-1, commands.length) < mod(maxComplete, commands.length) + ? commands.length-1 + : mod(commandsOut-1, commands.length)); + log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, waitingFor); + } w.await(); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java new file mode 100644 index 0000000000..dc29ef378e --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java @@ -0,0 +1,348 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.NamingException; +import java.util.HashMap; +import java.util.Map; + +public class MessageGroupQueueTest extends QpidBrokerTestCase +{ + private static final int TIMEOUT = 1500; + + protected final String QUEUE = "MessageGroupQueue"; + + private static final int MSG_COUNT = 50; + + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + + + protected void setUp() throws Exception + { + super.setUp(); + + producerConnection = getConnection(); + producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + producerConnection.start(); + + consumerConnection = getConnection(); + + } + + protected void tearDown() throws Exception + { + producerConnection.close(); + consumerConnection.close(); + super.tearDown(); + } + + /** + * Pre populate the queue with messages with groups as follows + * + * ONE + * TWO + * ONE + * TWO + * + * Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second + * consumer assigned group TWO if they are started in sequence. + * + * Thus doing + * + * c1 <--- (ONE) + * c2 <--- (TWO) + * c2 ack ---> + * + * c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE) + * + * i.e. + * + * c2 <--- (TWO) + * c2 ack ---> + * c1 <--- (ONE) + * c1 ack ---> + * + */ + public void testSimpleGroupAssignment() throws Exception + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.group_header_key","group"); + arguments.put("qpid.shared_msg_group","1"); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + String[] groups = { "ONE", "TWO"}; + + for (int msg = 0; msg < 4; msg++) + { + producer.send(createMessage(msg, groups[msg % groups.length])); + } + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + + + MessageConsumer consumer1 = cs1.createConsumer(queue); + MessageConsumer consumer2 = cs2.createConsumer(queue); + + consumerConnection.start(); + Message cs1Received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received first message", cs1Received); + + Message cs2Received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received first message", cs2Received); + + cs1Received.acknowledge(); + cs2Received.acknowledge(); + + Message cs2Received2 = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received second message", cs2Received2); + assertEquals("Differing groups", cs2Received2.getStringProperty("group"), + cs2Received.getStringProperty("group")); + + Message cs1Received2 = consumer1.receive(1000); + + assertNotNull("Consumer 1 should have received second message", cs1Received2); + assertEquals("Differing groups", cs1Received2.getStringProperty("group"), + cs1Received.getStringProperty("group")); + + cs1Received2.acknowledge(); + cs2Received2.acknowledge(); + + assertNull(consumer1.receive(1000)); + assertNull(consumer2.receive(1000)); + } + + /** + * + * Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different + * consumer. + * + * Pre-populate the queue as ONE, ONE, TWO, ONE + * + * create in sequence two consumers + * + * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2) + * + * Then close c1 before acking. + * + * If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which + * requires c2 to go "backwards" in the queue). + * + * */ + public void testConsumerCloseGroupAssignment() throws Exception + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.group_header_key","group"); + arguments.put("qpid.shared_msg_group","1"); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + producer.send(createMessage(1, "ONE")); + producer.send(createMessage(2, "ONE")); + producer.send(createMessage(3, "TWO")); + producer.send(createMessage(4, "ONE")); + + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + + + MessageConsumer consumer1 = cs1.createConsumer(queue); + + consumerConnection.start(); + MessageConsumer consumer2 = cs2.createConsumer(queue); + + Message cs1Received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received first message", cs1Received); + + Message cs2Received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received first message", cs2Received); + cs2Received.acknowledge(); + + Message cs2Received2 = consumer2.receive(1000); + + assertNull("Consumer 2 should not have received second message", cs2Received2); + + consumer1.close(); + + cs1Received.acknowledge(); + Message cs2Received3 = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received second message", cs2Received3); + assertEquals("Unexpected group", cs2Received3.getStringProperty("group"), + "ONE"); + + cs2Received3.acknowledge(); + + + Message cs2Received4 = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received third message", cs2Received4); + assertEquals("Unexpected group", cs2Received4.getStringProperty("group"), + "ONE"); + + cs2Received4.acknowledge(); + + assertNull(consumer2.receive(1000)); + } + + + /** + * + * Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned + * toa different consumer, including messages which were previously delivered but have now been released. + * + * Pre-populate the queue as ONE, ONE, TWO, ONE + * + * create in sequence two consumers + * + * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2) + * + * Then close c1 and its session without acking. + * + * If we now attempt to receive from c2, then the all messages in group ONE should be available (which + * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered + * + */ + + public void testConsumerCloseWithRelease() throws Exception + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.group_header_key","group"); + arguments.put("qpid.shared_msg_group","1"); + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + producer.send(createMessage(1, "ONE")); + producer.send(createMessage(2, "ONE")); + producer.send(createMessage(3, "TWO")); + producer.send(createMessage(4, "ONE")); + + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + + + MessageConsumer consumer1 = cs1.createConsumer(queue); + MessageConsumer consumer2 = cs2.createConsumer(queue); + + consumerConnection.start(); + Message cs1Received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received first message", cs1Received); + + Message received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received first message", received); + Message first = received; + + received = consumer2.receive(1000); + + assertNull("Consumer 2 should not have received second message", received); + + consumer1.close(); + cs1.close(); + first.acknowledge(); + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received second message", received); + assertEquals("Unexpected group", received.getStringProperty("group"), + "ONE"); + assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"), + received.getJMSRedelivered()); + + received.acknowledge(); + + + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received third message", received); + assertEquals("Unexpected group", received.getStringProperty("group"), + "ONE"); + + received.acknowledge(); + + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received fourth message", received); + assertEquals("Unexpected group", received.getStringProperty("group"), + "ONE"); + + received.acknowledge(); + + + assertNull(consumer2.receive(1000)); + } + + + private Message createMessage(int msg, String group) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setIntProperty("msg", msg); + send.setStringProperty("group", group); + + return send; + } +} |