summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-03 19:48:46 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-03 19:48:46 +0000
commit02bbab932f5f845bfa8eac6069bc4159bbe53d07 (patch)
tree7a31c2804f9b9fa5f1dbabc80cbe219d9e735890 /java
parent7b0c33ff443deb937d26f07c039bd483e9bcbe29 (diff)
downloadqpid-python-02bbab932f5f845bfa8eac6069bc4159bbe53d07.tar.gz
QPID-3720 : [Java Broker] Implement Message Grouping
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1226930 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java127
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java150
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java14
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java348
14 files changed, 674 insertions, 55 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a1f1c037ec..32d9c4878a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -149,7 +149,13 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
void removeMessagesFromQueue(long fromMessageId, long toMessageId);
-
+ static interface Visitor
+ {
+ boolean visit(QueueEntry entry);
+ }
+
+ void visit(Visitor visitor);
+
long getMaximumMessageSize();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 37fad54c07..142cfddb39 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -75,6 +75,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.AVAILABLE;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -85,6 +90,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.DEQUEUED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -95,6 +105,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.DELETED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
public final class ExpiredState extends EntryState
@@ -104,6 +119,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.EXPIRED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -113,6 +133,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.ACQUIRED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
public final class SubscriptionAcquiredState extends EntryState
@@ -134,6 +159,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return _subscription;
}
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
}
public final class SubscriptionAssignedState extends EntryState
@@ -155,6 +185,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return _subscription;
}
+
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 77c4b912e0..641aaa0a08 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -35,4 +35,6 @@ public interface QueueEntryList<Q extends QueueEntry>
Q getHead();
void entryDeleted(Q queueEntry);
+
+ int getPriorities();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ebed781a1a..d48445930a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
@@ -33,7 +32,6 @@ import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -45,6 +43,7 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.MessageGroupManager;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -68,11 +67,11 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+ private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
private final VirtualHost _virtualHost;
@@ -189,6 +188,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
+ private final MessageGroupManager _messageGroupManager;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
@@ -242,25 +242,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_logSubject = new QueueLogSubject(this);
_logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
- // Log the correct creation message
-
- // Extract the number of priorities for this Queue.
- // Leave it as 0 if we are a SimpleQueueEntryList
- int priorities = 0;
- if (entryListFactory instanceof PriorityQueueList.Factory)
- {
- priorities = ((PriorityQueueList)_entries).getPriorities();
- }
-
// Log the creation of this Queue.
// The priorities display is toggled on if we set priorities > 0
CurrentActor.get().message(_logSubject,
QueueMessages.CREATED(String.valueOf(_owner),
- priorities,
- _owner != null,
- autoDelete,
- durable, !durable,
- priorities > 0));
+ _entries.getPriorities(),
+ _owner != null,
+ autoDelete,
+ durable, !durable,
+ _entries.getPriorities() > 0));
getConfigStore().addConfiguredObject(this);
@@ -274,6 +264,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_logger.error("AMQQueue MBean creation has failed ", e);
}
+ if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+ {
+ _messageGroupManager = new MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255);
+ }
+ else
+ {
+ _messageGroupManager = null;
+ }
+
resetNotifications();
}
@@ -488,6 +487,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setExclusiveSubscriber(null);
subscription.setQueueContext(null);
+ if(_messageGroupManager != null)
+ {
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ _messageGroupManager.clearAssignments(subscription);
+
+ if(entry != null)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+
+ }
+
+ }
+
// auto-delete queues must be deleted if there are no remaining subscribers
if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
@@ -691,21 +716,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
try
{
- if (subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
+ if (!sub.isSuspended()
+ && subscriptionReadyAndHasInterest(sub, entry)
+ && mightAssign(sub, entry)
+ && !sub.wouldSuspend(entry))
{
- if (!sub.wouldSuspend(entry))
+ if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
{
- if (sub.acquires() && !entry.acquire(sub))
- {
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
- sub.restoreCredit(entry);
- }
- else
- {
- deliverMessage(sub, entry, false);
- }
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ deliverMessage(sub, entry, false);
}
}
}
@@ -716,6 +740,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ private boolean assign(final Subscription sub, final QueueEntry entry)
+ {
+ return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+ }
+
+
+ private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+ {
+ if(_messageGroupManager == null || !sub.acquires())
+ return true;
+ Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ return (assigned == null) || (assigned == sub);
+ }
+
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
@@ -1020,6 +1058,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean filterComplete();
}
+
+
public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
return getMessagesOnTheQueue(new QueueEntryFilter()
@@ -1074,6 +1114,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public void visit(final Visitor visitor)
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
+
+ while(queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+
+ if(!node.isDispensed())
+ {
+ if(visitor.visit(node))
+ {
+ break;
+ }
+ }
+ }
+ }
+
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
*
@@ -1708,11 +1766,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (node != null && node.isAvailable())
{
- if (sub.hasInterest(node))
+ if (sub.hasInterest(node) && mightAssign(sub, node))
{
if (!sub.wouldSuspend(node))
{
- if (sub.acquires() && !node.acquire(sub))
+ if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
@@ -1769,7 +1827,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
+ !mightAssign(sub,node)))
{
if (expired)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 0bb5dcc219..b40e5a28c2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -185,6 +185,11 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
advanceHead();
}
+ public int getPriorities()
+ {
+ return 0;
+ }
+
static class Factory implements QueueEntryListFactory
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index 5f8ab16c06..414a123c43 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -51,13 +51,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
_propertyName = propertyName;
}
- @Override
public AMQQueue getQueue()
{
return _queue;
}
- @Override
public SortedQueueEntryImpl add(final ServerMessage message)
{
synchronized(_lock)
@@ -286,7 +284,6 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
return (node == null ? Colour.BLACK : node.getColour()) == colour;
}
- @Override
public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
{
synchronized(_lock)
@@ -316,13 +313,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
- @Override
public QueueEntryIterator<SortedQueueEntryImpl> iterator()
{
return new QueueEntryIteratorImpl(_head);
}
- @Override
public SortedQueueEntryImpl getHead()
{
return _head;
@@ -333,7 +328,6 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
return _root;
}
- @Override
public void entryDeleted(final SortedQueueEntryImpl entry)
{
synchronized(_lock)
@@ -431,6 +425,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
+ public int getPriorities()
+ {
+ return 0;
+ }
+
/**
* Swaps the position of the node in the tree with it's successor
* (that is the node with the next highest key)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
new file mode 100644
index 0000000000..1999d655c9
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class MessageGroupManager
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MessageGroupManager.class);
+
+
+ private final String _groupId;
+ private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
+ private final int _groupMask;
+
+ public MessageGroupManager(final String groupId, final int maxGroups)
+ {
+ _groupId = groupId;
+ _groupMask = pow2(maxGroups)-1;
+ }
+
+ private static int pow2(final int i)
+ {
+ int val = 1;
+ while(val < i) val<<=1;
+ return val;
+ }
+
+ public Subscription getAssignedSubscription(final QueueEntry entry)
+ {
+ Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
+ }
+
+ public boolean acceptMessage(Subscription sub, QueueEntry entry)
+ {
+ Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupVal == null)
+ {
+ return true;
+ }
+ else
+ {
+ Integer group = groupVal.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == sub)
+ {
+ return true;
+ }
+ else
+ {
+ if(assignedSub == null)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Assigning group " + groupVal + " to sub " + sub);
+ }
+ assignedSub = _groupMap.putIfAbsent(group, sub);
+ return assignedSub == null || assignedSub == sub;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+
+ public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+ {
+ EntryFinder visitor = new EntryFinder(sub);
+ sub.getQueue().visit(visitor);
+ return visitor.getEntry();
+ }
+
+ private class EntryFinder implements AMQQueue.Visitor
+ {
+ private QueueEntry _entry;
+ private Subscription _sub;
+
+ public EntryFinder(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public boolean visit(final QueueEntry entry)
+ {
+ if(!entry.isAvailable())
+ return false;
+
+ Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupId == null)
+ return false;
+
+ Integer group = groupId.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == _sub)
+ {
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntry getEntry()
+ {
+ return _entry;
+ }
+ }
+
+ public void clearAssignments(Subscription sub)
+ {
+ Iterator<Subscription> subIter = _groupMap.values().iterator();
+ while(subIter.hasNext())
+ {
+ if(subIter.next() == sub)
+ {
+ subIter.remove();
+ }
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 8b029f9a51..f97ac5659e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -601,20 +601,20 @@ public class MockAMQQueue implements AMQQueue
}
- @Override
public int getMaximumDeliveryCount()
{
return 0;
}
- @Override
public void setMaximumDeliveryCount(int maximumDeliveryCount)
{
}
- @Override
public void setAlternateExchange(String exchangeName)
{
}
+ public void visit(final Visitor visitor)
+ {
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8984b7ca8c..4b6015945e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1583,6 +1583,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _prefetchLowMark;
}
+ public int getPrefetch()
+ {
+ return _prefetchHighMark;
+ }
+
public AMQShortString getDefaultQueueExchangeName()
{
return _connection.getDefaultQueueExchangeName();
@@ -3047,7 +3052,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public boolean prefetch()
{
- return getAMQConnection().getMaxPrefetch() > 0;
+ return _prefetchHighMark > 0;
}
/** Signifies that the session has pending sends to commit. */
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index a49e31ce8c..3b6179dd07 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -545,7 +545,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
else if (getSession().prefetch())
{
- capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ capacity = getSession().getPrefetch();
}
return capacity;
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index f53fa8d83c..1889577773 100644
--- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -588,7 +588,7 @@ public class AMQSession_0_10Test extends TestCase
}
boolean isTransacted = acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED ? true : false;
AMQSession_0_10 session = new AMQSession_0_10(createConnection(throwException), amqConnection, 1, isTransacted, acknowledgeMode,
- 1, 1, "test");
+ 0, 0, "test");
return session;
}
@@ -600,7 +600,6 @@ public class AMQSession_0_10Test extends TestCase
connection.setSessionFactory(new SessionFactory()
{
- @Override
public Session newSession(Connection conn, Binary name, long expiry)
{
return new MockSession(conn, new SessionDelegate(), name, expiry, throwException);
@@ -611,7 +610,6 @@ public class AMQSession_0_10Test extends TestCase
private final class MockMessageListener implements MessageListener
{
- @Override
public void onMessage(Message arg0)
{
}
@@ -710,23 +708,19 @@ public class AMQSession_0_10Test extends TestCase
{
private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
- @Override
public void setIdleTimeout(int i)
{
}
- @Override
public void send(ProtocolEvent msg)
{
_sendEvents.add(msg);
}
- @Override
public void flush()
{
}
- @Override
public void close()
{
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index 84e4056f4d..f64164c10b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -166,6 +166,11 @@ public abstract class AMQTypedValue
private static final class IntTypedValue extends AMQTypedValue
{
+ @Override
+ public String toString()
+ {
+ return "[INT: " + String.valueOf(_value) + "]";
+ }
private final int _value;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index d391181217..5a9ea73cae 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -44,6 +44,7 @@ import static org.apache.qpid.util.Serial.max;
import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -828,8 +829,17 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
- checkFailoverRequired("Session sync was interrupted by failover.");
- log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, Arrays.asList(commands));
+ checkFailoverRequired("Session sync was interrupted by failover.");
+ if(log.isDebugEnabled())
+ {
+ List<Method> waitingFor =
+ Arrays.asList(commands)
+ .subList(mod(maxComplete,commands.length),
+ mod(commandsOut-1, commands.length) < mod(maxComplete, commands.length)
+ ? commands.length-1
+ : mod(commandsOut-1, commands.length));
+ log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, waitingFor);
+ }
w.await();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
new file mode 100644
index 0000000000..dc29ef378e
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
@@ -0,0 +1,348 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MessageGroupQueueTest extends QpidBrokerTestCase
+{
+ private static final int TIMEOUT = 1500;
+
+ protected final String QUEUE = "MessageGroupQueue";
+
+ private static final int MSG_COUNT = 50;
+
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ producerConnection.start();
+
+ consumerConnection = getConnection();
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ producerConnection.close();
+ consumerConnection.close();
+ super.tearDown();
+ }
+
+ /**
+ * Pre populate the queue with messages with groups as follows
+ *
+ * ONE
+ * TWO
+ * ONE
+ * TWO
+ *
+ * Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second
+ * consumer assigned group TWO if they are started in sequence.
+ *
+ * Thus doing
+ *
+ * c1 <--- (ONE)
+ * c2 <--- (TWO)
+ * c2 ack --->
+ *
+ * c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE)
+ *
+ * i.e.
+ *
+ * c2 <--- (TWO)
+ * c2 ack --->
+ * c1 <--- (ONE)
+ * c1 ack --->
+ *
+ */
+ public void testSimpleGroupAssignment() throws Exception
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.group_header_key","group");
+ arguments.put("qpid.shared_msg_group","1");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ String[] groups = { "ONE", "TWO"};
+
+ for (int msg = 0; msg < 4; msg++)
+ {
+ producer.send(createMessage(msg, groups[msg % groups.length]));
+ }
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ consumerConnection.start();
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received first message", cs1Received);
+
+ Message cs2Received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received first message", cs2Received);
+
+ cs1Received.acknowledge();
+ cs2Received.acknowledge();
+
+ Message cs2Received2 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received second message", cs2Received2);
+ assertEquals("Differing groups", cs2Received2.getStringProperty("group"),
+ cs2Received.getStringProperty("group"));
+
+ Message cs1Received2 = consumer1.receive(1000);
+
+ assertNotNull("Consumer 1 should have received second message", cs1Received2);
+ assertEquals("Differing groups", cs1Received2.getStringProperty("group"),
+ cs1Received.getStringProperty("group"));
+
+ cs1Received2.acknowledge();
+ cs2Received2.acknowledge();
+
+ assertNull(consumer1.receive(1000));
+ assertNull(consumer2.receive(1000));
+ }
+
+ /**
+ *
+ * Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different
+ * consumer.
+ *
+ * Pre-populate the queue as ONE, ONE, TWO, ONE
+ *
+ * create in sequence two consumers
+ *
+ * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
+ *
+ * Then close c1 before acking.
+ *
+ * If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which
+ * requires c2 to go "backwards" in the queue).
+ *
+ * */
+ public void testConsumerCloseGroupAssignment() throws Exception
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.group_header_key","group");
+ arguments.put("qpid.shared_msg_group","1");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "ONE"));
+ producer.send(createMessage(3, "TWO"));
+ producer.send(createMessage(4, "ONE"));
+
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+
+ consumerConnection.start();
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received first message", cs1Received);
+
+ Message cs2Received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received first message", cs2Received);
+ cs2Received.acknowledge();
+
+ Message cs2Received2 = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not have received second message", cs2Received2);
+
+ consumer1.close();
+
+ cs1Received.acknowledge();
+ Message cs2Received3 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received second message", cs2Received3);
+ assertEquals("Unexpected group", cs2Received3.getStringProperty("group"),
+ "ONE");
+
+ cs2Received3.acknowledge();
+
+
+ Message cs2Received4 = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received third message", cs2Received4);
+ assertEquals("Unexpected group", cs2Received4.getStringProperty("group"),
+ "ONE");
+
+ cs2Received4.acknowledge();
+
+ assertNull(consumer2.receive(1000));
+ }
+
+
+ /**
+ *
+ * Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned
+ * toa different consumer, including messages which were previously delivered but have now been released.
+ *
+ * Pre-populate the queue as ONE, ONE, TWO, ONE
+ *
+ * create in sequence two consumers
+ *
+ * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
+ *
+ * Then close c1 and its session without acking.
+ *
+ * If we now attempt to receive from c2, then the all messages in group ONE should be available (which
+ * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
+ *
+ */
+
+ public void testConsumerCloseWithRelease() throws Exception
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.group_header_key","group");
+ arguments.put("qpid.shared_msg_group","1");
+
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "ONE"));
+ producer.send(createMessage(3, "TWO"));
+ producer.send(createMessage(4, "ONE"));
+
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ consumerConnection.start();
+ Message cs1Received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received first message", cs1Received);
+
+ Message received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received first message", received);
+ Message first = received;
+
+ received = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not have received second message", received);
+
+ consumer1.close();
+ cs1.close();
+ first.acknowledge();
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received second message", received);
+ assertEquals("Unexpected group", received.getStringProperty("group"),
+ "ONE");
+ assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
+ received.getJMSRedelivered());
+
+ received.acknowledge();
+
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received third message", received);
+ assertEquals("Unexpected group", received.getStringProperty("group"),
+ "ONE");
+
+ received.acknowledge();
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received fourth message", received);
+ assertEquals("Unexpected group", received.getStringProperty("group"),
+ "ONE");
+
+ received.acknowledge();
+
+
+ assertNull(consumer2.receive(1000));
+ }
+
+
+ private Message createMessage(int msg, String group) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+ send.setIntProperty("msg", msg);
+ send.setStringProperty("group", group);
+
+ return send;
+ }
+}