diff options
25 files changed, 273 insertions, 94 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java index c07fa5af1d..958220caa7 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java @@ -210,9 +210,9 @@ public class DiagnosticExchange extends AbstractExchange Long value = new Long(SizeOf.getUsedMemory()); AMQShortString key = new AMQShortString("memory"); - FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders(); + FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().getProperties()).getHeaders(); headers.put(key, value); - ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers); + ((BasicContentHeaderProperties)payload.getContentHeaderBody().getProperties()).setHeaders(headers); AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue")); ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5317eb2fa3..e5bb72597d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -224,7 +224,7 @@ public class AMQChannel finally { long bodySize = _currentMessage.getContentHeaderBody().bodySize; - long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp(); + long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().getProperties()).getTimestamp(); _session.registerMessageReceived(bodySize, timestamp); // callback to allow the context to do any post message processing // primary use is to allow message return processing in the non-tx case diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 8f918e557f..5082814229 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -351,7 +351,7 @@ public class HeadersExchange extends AbstractExchange { //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers, //but these are not yet implemented. - return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders(); + return ((BasicContentHeaderProperties) contentHeaderFrame.getProperties()).getHeaders(); } protected ExchangeMBean createMBean() throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index 946274f936..eb45ff94b5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -110,7 +110,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> { CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + (CommonContentHeaderProperties) message.getContentHeaderBody().getProperties(); if (_logger.isDebugEnabled()) { @@ -165,7 +165,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; + message.getContentHeaderBody().getProperties(); AMQShortString replyTo = _properties.getReplyTo(); return (replyTo == null) ? null : replyTo.toString(); @@ -180,7 +180,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> { CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; + message.getContentHeaderBody().getProperties(); AMQShortString type = _properties.getType(); return (type == null) ? null : type.toString(); @@ -208,7 +208,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> { CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; + message.getContentHeaderBody().getProperties(); return (int) _properties.getPriority(); } @@ -221,7 +221,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; + message.getContentHeaderBody().getProperties(); AMQShortString messageId = _properties.getMessageId(); return (messageId == null) ? null : messageId; @@ -235,7 +235,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> { CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; + message.getContentHeaderBody().getProperties(); return _properties.getTimestamp(); } @@ -247,7 +247,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> { CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; + message.getContentHeaderBody().getProperties(); AMQShortString correlationId = _properties.getCorrelationId(); return (correlationId == null) ? null : correlationId.toString(); @@ -261,7 +261,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; + message.getContentHeaderBody().getProperties(); return _properties.getExpiration(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 09602df399..33de9c631a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -410,7 +410,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { // Create header attributes list CommonContentHeaderProperties headerProperties = - (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; + (CommonContentHeaderProperties) msg.getContentHeaderBody().getProperties(); String mimeType = null, encoding = null; if (headerProperties != null) { @@ -493,7 +493,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private String[] getMessageHeaderProperties(ContentHeaderBody headerBody) { List<String> list = new ArrayList<String>(); - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties(); list.add("reply-to = " + headerProperties.getReplyToAsString()); list.add("propertyFlags = " + headerProperties.getPropertyFlags()); list.add("ApplicationID = " + headerProperties.getAppIdAsString()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 3a77883743..8496b9d097 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -59,7 +59,7 @@ public class ConflationQueueList extends SimpleQueueEntryList try { - Object value = ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).getHeaders().get(_conflationKey); + Object value = ((BasicContentHeaderProperties)message.getContentHeaderBody().getProperties()).getHeaders().get(_conflationKey); if(value != null) { latestValueReference = _latestValuesMap.get(value); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 519e8bf50c..d8ad7be6bb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -94,9 +94,9 @@ public class IncomingMessage implements InboundMessage public void setExpiration() { long expiration = - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration(); + ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration(); long timestamp = - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp(); + ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp(); if (SYNCHED_CLOCKS) { @@ -176,8 +176,8 @@ public class IncomingMessage implements InboundMessage // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ? - ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null; + AMQShortString userID = getContentHeaderBody().getProperties() instanceof BasicContentHeaderProperties ? + ((BasicContentHeaderProperties) getContentHeaderBody().getProperties()).getUserId() : null; if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString())) { @@ -280,8 +280,8 @@ public class IncomingMessage implements InboundMessage public boolean isPersistent() { - return getContentHeaderBody().properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == + return getContentHeaderBody().getProperties() instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) getContentHeaderBody().getProperties()).getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index a3f7a0dcc0..dc86422f51 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -57,7 +57,7 @@ public class PriorityQueueList implements QueueEntryList { try { - int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; + int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().getProperties()))).getPriority() - _priorityOffset; if(index >= _priorities) { index = _priorities-1; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java new file mode 100644 index 0000000000..73d54e9247 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.pool.ReadWriteRunnable; +import org.apache.qpid.server.logging.actors.CurrentActor; + +public class QueueRunner implements ReadWriteRunnable +{ + private static final Logger _logger = Logger.getLogger(QueueRunner.class); + + private String _name; + private SimpleAMQQueue _queue; + + public QueueRunner(SimpleAMQQueue queue, long count) + { + _queue = queue; + _name = "QueueRunner-" + count + "-" + _queue.getLogActor(); + } + + public void run() + { + String originalName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName(_name); + CurrentActor.set(_queue.getLogActor()); + + _queue.processQueue(this); + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + Thread.currentThread().setName(originalName); + } + } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } + + public String toString() + { + return _name; + } +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 4d0bef90ae..bcb0c12420 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1230,10 +1230,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void deliverAsync() { - Runner runner = new Runner(_stateChangeCount.incrementAndGet()); + QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1246,52 +1245,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(new SubFlushRunner(sub)); } - - private class Runner implements ReadWriteRunnable - { - String _name; - public Runner(long count) - { - _name = "QueueRunner-" + count + "-" + _logActor; - } - - public void run() - { - String originalName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName(_name); - CurrentActor.set(_logActor); - - processQueue(this); - } - catch (AMQException e) - { - _logger.error(e); - } - finally - { - CurrentActor.remove(); - Thread.currentThread().setName(originalName); - } - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - - public String toString() - { - return _name; - } - } - private class SubFlushRunner implements ReadWriteRunnable { private final Subscription _sub; @@ -1529,7 +1482,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener * @param runner the Runner to schedule * @throws AMQException */ - private void processQueue(Runnable runner) throws AMQException + public void processQueue(QueueRunner runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; @@ -1897,4 +1850,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost."); } } + + public LogActor getLogActor() + { + return _logActor; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java index b09283b11f..791dfd101a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java @@ -120,8 +120,8 @@ public class TransientMessageData public boolean isPersistent() { - return _contentHeaderBody.properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == + return _contentHeaderBody.getProperties() instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 80c642a734..f9b859ad97 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -361,7 +361,7 @@ public class Show extends AbstractCommand try { - headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties); + headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().getProperties()); } catch (AMQException e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 250b7470a4..5a2add4910 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -227,7 +227,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase static ContentHeaderBody getContentHeader(FieldTable headers) { ContentHeaderBody header = new ContentHeaderBody(); - header.properties = getProperties(headers); + header.setProperties(getProperties(headers)); return header; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index aff7af6952..d4f907ce79 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -95,7 +95,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest AMQMessage msg = super.createMessage(id); BasicContentHeaderProperties props = new BasicContentHeaderProperties(); props.setPriority(i); - msg.getContentHeaderBody().properties = props; + msg.getContentHeaderBody().setProperties(props); return msg; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index f7aefb1900..d9f143f8b8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -448,8 +448,8 @@ public class AMQQueueMBeanTest extends TestCase ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes - contentHeaderBody.properties = new BasicContentHeaderProperties(); - ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); + contentHeaderBody.setProperties(new BasicContentHeaderProperties()); + ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1)); IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession); msg.setContentHeaderBody(contentHeaderBody); return msg; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 79f7d75aa9..fbf1ac3591 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -144,7 +144,7 @@ public class AckTest extends TestCase //This is DeliveryMode.PERSISTENT b.setDeliveryMode((byte) 2); ContentHeaderBody cb = new ContentHeaderBody(); - cb.properties = b; + cb.setProperties(b); msg.setContentHeaderBody(cb); } else diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 53bf2dd852..2151e2a396 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -31,19 +31,15 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -405,8 +401,8 @@ public class SimpleAMQQueueTest extends TestCase NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.properties = new BasicContentHeaderProperties(); - ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + contentHeaderBody.setProperties(new BasicContentHeaderProperties()); + ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); @@ -432,6 +428,112 @@ public class SimpleAMQQueueTest extends TestCase assertNull(data); } + /** + * processQueue() is used when asynchronously delivering messages to + * subscriptions which could not be delivered immediately during the + * enqueue() operation. + * + * A defect within the method would mean that delivery of these messages may + * not occur should the Runner stop before all messages have been processed. + * Such a defect was discovered when Selectors were used such that one and + * only one subscription can/will accept any given messages, but multiple + * subscriptions are present, and one of the earlier subscriptions receives + * more messages than the others. + * + * This test is to validate that the processQueue() method is able to + * correctly deliver all of the messages present for asynchronous delivery + * to subscriptions in such a scenario. + */ + public void testProcessQueueWithUniqueSelectors() throws Exception + { + StoreContext testContext = new StoreContext(); + TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); + SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString( + "testQueue"), false, new AMQShortString("testOwner"), false, + _virtualHost, factory) + { + @Override + public void deliverAsync(Subscription sub) + { + // do nothing, i.e prevent deliveries by the SubFlushRunner + // when registering the new subscriptions + } + }; + + // retrieve the QueueEntryList the queue creates and insert the test + // messages, thus avoiding straight-through delivery attempts during + //enqueue() process. + QueueEntryList list = factory.getQueueEntryList(); + assertNotNull("QueueEntryList should have been created", list); + + QueueEntry msg1 = list.add(createMessage(1L), testContext); + QueueEntry msg2 = list.add(createMessage(2L), testContext); + QueueEntry msg3 = list.add(createMessage(3L), testContext); + QueueEntry msg4 = list.add(createMessage(4L), testContext); + QueueEntry msg5 = list.add(createMessage(5L), testContext); + + // Create lists of the entries each subscription should be interested + // in.Bias over 50% of the messages to the first subscription so that + // the later subscriptions reject them and report being done before + // the first subscription as the processQueue method proceeds. + List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3); + List<QueueEntry> msgListSub2 = createEntriesList(msg4); + List<QueueEntry> msgListSub3 = createEntriesList(msg5); + + MockSubscription sub1 = new MockSubscription(msgListSub1); + MockSubscription sub2 = new MockSubscription(msgListSub2); + MockSubscription sub3 = new MockSubscription(msgListSub3); + + // register the subscriptions + testQueue.registerSubscription(sub1, false); + testQueue.registerSubscription(sub2, false); + testQueue.registerSubscription(sub3, false); + + //check that no messages have been delivered to the + //subscriptions during registration + assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); + assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); + assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); + + // call processQueue to deliver the messages + testQueue.processQueue(new QueueRunner(testQueue, 1) + { + @Override + public void run() + { + // we dont actually want/need this runner to do any work + // because we we are already doing it! + } + }); + + // check expected messages delivered to correct consumers + verifyRecievedMessages(msgListSub1, sub1.getMessages()); + verifyRecievedMessages(msgListSub2, sub2.getMessages()); + verifyRecievedMessages(msgListSub3, sub3.getMessages()); + } + + private List<QueueEntry> createEntriesList(QueueEntry... entries) + { + ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>(); + for (QueueEntry entry : entries) + { + entriesList.add(entry); + } + return entriesList; + } + + private void verifyRecievedMessages(List<QueueEntry> expected, + List<QueueEntry> delivered) + { + assertEquals("Consumer did not receive the expected number of messages", + expected.size(), delivered.size()); + + for (QueueEntry msg : expected) + { + assertTrue("Consumer did not recieve msg: " + + msg.getMessage().getMessageId(), delivered.contains(msg)); + } + } // FIXME: move this to somewhere useful private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) @@ -495,4 +597,20 @@ public class SimpleAMQQueueTest extends TestCase AMQMessage messageA = new TestMessage(id, id, info, new StoreContext()); return messageA; } + + class TestSimpleQueueEntryListFactory implements QueueEntryListFactory + { + QueueEntryList _list; + + public QueueEntryList createQueueEntryList(AMQQueue queue) + { + _list = new SimpleQueueEntryList(queue); + return _list; + } + + public QueueEntryList getQueueEntryList() + { + return _list; + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 7d337eed30..50cbb03353 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -366,7 +366,7 @@ public class MessageStoreTest extends TestCase headerBody.classId = BasicConsumeBodyImpl.CLASS_ID; headerBody.bodySize = 0; - headerBody.properties = properties; + headerBody.setProperties(properties); try { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 2346660d25..9eb05895fa 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -105,7 +105,7 @@ public class TestReferenceCounting extends TestCase ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); bchp.setDeliveryMode((byte)2); - chb.properties = bchp; + chb.setProperties(bchp); return chb; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 673d3f1676..f8e729ed55 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -25,12 +25,12 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -46,11 +46,21 @@ public class MockSubscription implements Subscription private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); + private List<QueueEntry> _acceptEntries = null; private static final AtomicLong idGenerator = new AtomicLong(0); // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); + public MockSubscription() + { + } + + public MockSubscription(List<QueueEntry> acceptEntries) + { + _acceptEntries = acceptEntries; + } + public void close() { _closed = true; @@ -101,8 +111,15 @@ public class MockSubscription implements Subscription _stateChangeLock.lock(); } - public boolean hasInterest(QueueEntry msg) + public boolean hasInterest(QueueEntry entry) { + if(_acceptEntries != null) + { + //simulate selector behaviour, only signal + //interest in the dictated queue entries + return _acceptEntries.contains(entry); + } + return true; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index f2096df9d1..8977b86112 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -193,7 +193,7 @@ public class InternalBrokerBaseCase extends TestCase //Make Message Persistent properties.setDeliveryMode((byte) 2); - _headerBody.properties = properties; + _headerBody.setProperties(properties); channel.publishContentHeader(_headerBody); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index e719c9a4b2..40c1df0c5d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -99,7 +99,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr, - (BasicContentHeaderProperties) contentHeader.properties, + (BasicContentHeaderProperties) contentHeader.getProperties(), exchange, routingKey); return createMessage(delegate, data); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index a7d41e2cde..f3d2e22bad 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -103,7 +103,7 @@ public class MessageFactoryRegistry AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) throws AMQException, JMSException { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties(); // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over // AMQP. When the type is null, it can only be assumed that the message is a byte message. diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java index 1db7e200bd..7d1651c1bf 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java @@ -258,7 +258,7 @@ public class BasicDeliverTest static ContentHeaderBody createContentHeaderBody() { ContentHeaderBody body = new ContentHeaderBody(); - body.properties = new BasicContentHeaderProperties(); + body.setProperties(new BasicContentHeaderProperties()); body.weight = 1; body.classId = 6; return body; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 83e5a7e341..30db3b8be7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -36,7 +36,7 @@ public class ContentHeaderBody implements AMQBody public long bodySize; /** must never be null */ - public ContentHeaderProperties properties; + private ContentHeaderProperties properties; public ContentHeaderBody() { @@ -128,4 +128,14 @@ public class ContentHeaderBody implements AMQBody { return new AMQFrame(channelId, body); } + + public ContentHeaderProperties getProperties() + { + return properties; + } + + public void setProperties(ContentHeaderProperties props) + { + properties = props; + } } |