summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-03-30 16:02:45 +0000
committerRobert Gemmell <robbie@apache.org>2011-03-30 16:02:45 +0000
commitec03def927a7c2f2a1c686965774b01d3402b4cd (patch)
tree8d4dba4238de42f112df18992559aca41df32836
parent13ab6eab1b87f2b1311e211205bcce2b550f5e76 (diff)
downloadqpid-python-ec03def927a7c2f2a1c686965774b01d3402b4cd.tar.gz
QPID-3167: add a unit test of SimpleAMQQueue#processQueue to check delivery when subscriptions with unique selectors are in use
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1087001 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java76
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java130
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java2
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java12
25 files changed, 273 insertions, 94 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
index c07fa5af1d..958220caa7 100644
--- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
+++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
@@ -210,9 +210,9 @@ public class DiagnosticExchange extends AbstractExchange
Long value = new Long(SizeOf.getUsedMemory());
AMQShortString key = new AMQShortString("memory");
- FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders();
+ FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().getProperties()).getHeaders();
headers.put(key, value);
- ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
+ ((BasicContentHeaderProperties)payload.getContentHeaderBody().getProperties()).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5317eb2fa3..e5bb72597d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -224,7 +224,7 @@ public class AMQChannel
finally
{
long bodySize = _currentMessage.getContentHeaderBody().bodySize;
- long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp();
+ long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().getProperties()).getTimestamp();
_session.registerMessageReceived(bodySize, timestamp);
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 8f918e557f..5082814229 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -351,7 +351,7 @@ public class HeadersExchange extends AbstractExchange
{
//what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
//but these are not yet implemented.
- return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
+ return ((BasicContentHeaderProperties) contentHeaderFrame.getProperties()).getHeaders();
}
protected ExchangeMBean createMBean() throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
index 946274f936..eb45ff94b5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -110,7 +110,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
{
CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ (CommonContentHeaderProperties) message.getContentHeaderBody().getProperties();
if (_logger.isDebugEnabled())
{
@@ -165,7 +165,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
AMQShortString replyTo = _properties.getReplyTo();
return (replyTo == null) ? null : replyTo.toString();
@@ -180,7 +180,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
{
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
AMQShortString type = _properties.getType();
return (type == null) ? null : type.toString();
@@ -208,7 +208,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
{
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
return (int) _properties.getPriority();
}
@@ -221,7 +221,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
AMQShortString messageId = _properties.getMessageId();
return (messageId == null) ? null : messageId;
@@ -235,7 +235,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
{
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
return _properties.getTimestamp();
}
@@ -247,7 +247,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
{
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
AMQShortString correlationId = _properties.getCorrelationId();
return (correlationId == null) ? null : correlationId.toString();
@@ -261,7 +261,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
return _properties.getExpiration();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 09602df399..33de9c631a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -410,7 +410,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
// Create header attributes list
CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().getProperties();
String mimeType = null, encoding = null;
if (headerProperties != null)
{
@@ -493,7 +493,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
{
List<String> list = new ArrayList<String>();
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
list.add("reply-to = " + headerProperties.getReplyToAsString());
list.add("propertyFlags = " + headerProperties.getPropertyFlags());
list.add("ApplicationID = " + headerProperties.getAppIdAsString());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 3a77883743..8496b9d097 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -59,7 +59,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
try
{
- Object value = ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).getHeaders().get(_conflationKey);
+ Object value = ((BasicContentHeaderProperties)message.getContentHeaderBody().getProperties()).getHeaders().get(_conflationKey);
if(value != null)
{
latestValueReference = _latestValuesMap.get(value);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 519e8bf50c..d8ad7be6bb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -94,9 +94,9 @@ public class IncomingMessage implements InboundMessage
public void setExpiration()
{
long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
if (SYNCHED_CLOCKS)
{
@@ -176,8 +176,8 @@ public class IncomingMessage implements InboundMessage
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
- AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
+ AMQShortString userID = getContentHeaderBody().getProperties() instanceof BasicContentHeaderProperties ?
+ ((BasicContentHeaderProperties) getContentHeaderBody().getProperties()).getUserId() : null;
if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
{
@@ -280,8 +280,8 @@ public class IncomingMessage implements InboundMessage
public boolean isPersistent()
{
- return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() ==
+ return getContentHeaderBody().getProperties() instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeaderBody().getProperties()).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index a3f7a0dcc0..dc86422f51 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -57,7 +57,7 @@ public class PriorityQueueList implements QueueEntryList
{
try
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+ int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().getProperties()))).getPriority() - _priorityOffset;
if(index >= _priorities)
{
index = _priorities-1;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
new file mode 100644
index 0000000000..73d54e9247
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+
+public class QueueRunner implements ReadWriteRunnable
+{
+ private static final Logger _logger = Logger.getLogger(QueueRunner.class);
+
+ private String _name;
+ private SimpleAMQQueue _queue;
+
+ public QueueRunner(SimpleAMQQueue queue, long count)
+ {
+ _queue = queue;
+ _name = "QueueRunner-" + count + "-" + _queue.getLogActor();
+ }
+
+ public void run()
+ {
+ String originalName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(_name);
+ CurrentActor.set(_queue.getLogActor());
+
+ _queue.processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
+ }
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+
+ public String toString()
+ {
+ return _name;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 4d0bef90ae..bcb0c12420 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1230,10 +1230,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
-
public void deliverAsync()
{
- Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+ QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1246,52 +1245,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(new SubFlushRunner(sub));
}
-
- private class Runner implements ReadWriteRunnable
- {
- String _name;
- public Runner(long count)
- {
- _name = "QueueRunner-" + count + "-" + _logActor;
- }
-
- public void run()
- {
- String originalName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(_name);
- CurrentActor.set(_logActor);
-
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
- }
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
- public String toString()
- {
- return _name;
- }
- }
-
private class SubFlushRunner implements ReadWriteRunnable
{
private final Subscription _sub;
@@ -1529,7 +1482,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
* @param runner the Runner to schedule
* @throws AMQException
*/
- private void processQueue(Runnable runner) throws AMQException
+ public void processQueue(QueueRunner runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
@@ -1897,4 +1850,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
}
}
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
index b09283b11f..791dfd101a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
@@ -120,8 +120,8 @@ public class TransientMessageData
public boolean isPersistent()
{
- return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() ==
+ return _contentHeaderBody.getProperties() instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index 80c642a734..f9b859ad97 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -361,7 +361,7 @@ public class Show extends AbstractCommand
try
{
- headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
+ headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().getProperties());
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 250b7470a4..5a2add4910 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -227,7 +227,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
static ContentHeaderBody getContentHeader(FieldTable headers)
{
ContentHeaderBody header = new ContentHeaderBody();
- header.properties = getProperties(headers);
+ header.setProperties(getProperties(headers));
return header;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index aff7af6952..d4f907ce79 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -95,7 +95,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
AMQMessage msg = super.createMessage(id);
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
props.setPriority(i);
- msg.getContentHeaderBody().properties = props;
+ msg.getContentHeaderBody().setProperties(props);
return msg;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index f7aefb1900..d9f143f8b8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -448,8 +448,8 @@ public class AMQQueueMBeanTest extends TestCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 79f7d75aa9..fbf1ac3591 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -144,7 +144,7 @@ public class AckTest extends TestCase
//This is DeliveryMode.PERSISTENT
b.setDeliveryMode((byte) 2);
ContentHeaderBody cb = new ContentHeaderBody();
- cb.properties = b;
+ cb.setProperties(b);
msg.setContentHeaderBody(cb);
}
else
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 53bf2dd852..2151e2a396 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -31,19 +31,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -405,8 +401,8 @@ public class SimpleAMQQueueTest extends TestCase
NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
@@ -432,6 +428,112 @@ public class SimpleAMQQueueTest extends TestCase
assertNull(data);
}
+ /**
+ * processQueue() is used when asynchronously delivering messages to
+ * subscriptions which could not be delivered immediately during the
+ * enqueue() operation.
+ *
+ * A defect within the method would mean that delivery of these messages may
+ * not occur should the Runner stop before all messages have been processed.
+ * Such a defect was discovered when Selectors were used such that one and
+ * only one subscription can/will accept any given messages, but multiple
+ * subscriptions are present, and one of the earlier subscriptions receives
+ * more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to
+ * correctly deliver all of the messages present for asynchronous delivery
+ * to subscriptions in such a scenario.
+ */
+ public void testProcessQueueWithUniqueSelectors() throws Exception
+ {
+ StoreContext testContext = new StoreContext();
+ TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
+ SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString(
+ "testQueue"), false, new AMQShortString("testOwner"), false,
+ _virtualHost, factory)
+ {
+ @Override
+ public void deliverAsync(Subscription sub)
+ {
+ // do nothing, i.e prevent deliveries by the SubFlushRunner
+ // when registering the new subscriptions
+ }
+ };
+
+ // retrieve the QueueEntryList the queue creates and insert the test
+ // messages, thus avoiding straight-through delivery attempts during
+ //enqueue() process.
+ QueueEntryList list = factory.getQueueEntryList();
+ assertNotNull("QueueEntryList should have been created", list);
+
+ QueueEntry msg1 = list.add(createMessage(1L), testContext);
+ QueueEntry msg2 = list.add(createMessage(2L), testContext);
+ QueueEntry msg3 = list.add(createMessage(3L), testContext);
+ QueueEntry msg4 = list.add(createMessage(4L), testContext);
+ QueueEntry msg5 = list.add(createMessage(5L), testContext);
+
+ // Create lists of the entries each subscription should be interested
+ // in.Bias over 50% of the messages to the first subscription so that
+ // the later subscriptions reject them and report being done before
+ // the first subscription as the processQueue method proceeds.
+ List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+ List<QueueEntry> msgListSub2 = createEntriesList(msg4);
+ List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+
+ MockSubscription sub1 = new MockSubscription(msgListSub1);
+ MockSubscription sub2 = new MockSubscription(msgListSub2);
+ MockSubscription sub3 = new MockSubscription(msgListSub3);
+
+ // register the subscriptions
+ testQueue.registerSubscription(sub1, false);
+ testQueue.registerSubscription(sub2, false);
+ testQueue.registerSubscription(sub3, false);
+
+ //check that no messages have been delivered to the
+ //subscriptions during registration
+ assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
+
+ // call processQueue to deliver the messages
+ testQueue.processQueue(new QueueRunner(testQueue, 1)
+ {
+ @Override
+ public void run()
+ {
+ // we dont actually want/need this runner to do any work
+ // because we we are already doing it!
+ }
+ });
+
+ // check expected messages delivered to correct consumers
+ verifyRecievedMessages(msgListSub1, sub1.getMessages());
+ verifyRecievedMessages(msgListSub2, sub2.getMessages());
+ verifyRecievedMessages(msgListSub3, sub3.getMessages());
+ }
+
+ private List<QueueEntry> createEntriesList(QueueEntry... entries)
+ {
+ ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+ for (QueueEntry entry : entries)
+ {
+ entriesList.add(entry);
+ }
+ return entriesList;
+ }
+
+ private void verifyRecievedMessages(List<QueueEntry> expected,
+ List<QueueEntry> delivered)
+ {
+ assertEquals("Consumer did not receive the expected number of messages",
+ expected.size(), delivered.size());
+
+ for (QueueEntry msg : expected)
+ {
+ assertTrue("Consumer did not recieve msg: "
+ + msg.getMessage().getMessageId(), delivered.contains(msg));
+ }
+ }
// FIXME: move this to somewhere useful
private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
@@ -495,4 +597,20 @@ public class SimpleAMQQueueTest extends TestCase
AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
return messageA;
}
+
+ class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+ {
+ QueueEntryList _list;
+
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ _list = new SimpleQueueEntryList(queue);
+ return _list;
+ }
+
+ public QueueEntryList getQueueEntryList()
+ {
+ return _list;
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 7d337eed30..50cbb03353 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -366,7 +366,7 @@ public class MessageStoreTest extends TestCase
headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
headerBody.bodySize = 0;
- headerBody.properties = properties;
+ headerBody.setProperties(properties);
try
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 2346660d25..9eb05895fa 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -105,7 +105,7 @@ public class TestReferenceCounting extends TestCase
ContentHeaderBody chb = new ContentHeaderBody();
BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
bchp.setDeliveryMode((byte)2);
- chb.properties = bchp;
+ chb.setProperties(bchp);
return chb;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 673d3f1676..f8e729ed55 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -25,12 +25,12 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,11 +46,21 @@ public class MockSubscription implements Subscription
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private List<QueueEntry> _acceptEntries = null;
private static final AtomicLong idGenerator = new AtomicLong(0);
// Create a simple ID that increments for ever new Subscription
private final long _subscriptionID = idGenerator.getAndIncrement();
+ public MockSubscription()
+ {
+ }
+
+ public MockSubscription(List<QueueEntry> acceptEntries)
+ {
+ _acceptEntries = acceptEntries;
+ }
+
public void close()
{
_closed = true;
@@ -101,8 +111,15 @@ public class MockSubscription implements Subscription
_stateChangeLock.lock();
}
- public boolean hasInterest(QueueEntry msg)
+ public boolean hasInterest(QueueEntry entry)
{
+ if(_acceptEntries != null)
+ {
+ //simulate selector behaviour, only signal
+ //interest in the dictated queue entries
+ return _acceptEntries.contains(entry);
+ }
+
return true;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index f2096df9d1..8977b86112 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -193,7 +193,7 @@ public class InternalBrokerBaseCase extends TestCase
//Make Message Persistent
properties.setDeliveryMode((byte) 2);
- _headerBody.properties = properties;
+ _headerBody.setProperties(properties);
channel.publishContentHeader(_headerBody);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index e719c9a4b2..40c1df0c5d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -99,7 +99,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
- (BasicContentHeaderProperties) contentHeader.properties,
+ (BasicContentHeaderProperties) contentHeader.getProperties(),
exchange, routingKey);
return createMessage(delegate, data);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index a7d41e2cde..f3d2e22bad 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -103,7 +103,7 @@ public class MessageFactoryRegistry
AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
throws AMQException, JMSException
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
index 1db7e200bd..7d1651c1bf 100644
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
@@ -258,7 +258,7 @@ public class BasicDeliverTest
static ContentHeaderBody createContentHeaderBody()
{
ContentHeaderBody body = new ContentHeaderBody();
- body.properties = new BasicContentHeaderProperties();
+ body.setProperties(new BasicContentHeaderProperties());
body.weight = 1;
body.classId = 6;
return body;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 83e5a7e341..30db3b8be7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -36,7 +36,7 @@ public class ContentHeaderBody implements AMQBody
public long bodySize;
/** must never be null */
- public ContentHeaderProperties properties;
+ private ContentHeaderProperties properties;
public ContentHeaderBody()
{
@@ -128,4 +128,14 @@ public class ContentHeaderBody implements AMQBody
{
return new AMQFrame(channelId, body);
}
+
+ public ContentHeaderProperties getProperties()
+ {
+ return properties;
+ }
+
+ public void setProperties(ContentHeaderProperties props)
+ {
+ properties = props;
+ }
}