diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-29 16:47:50 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-29 16:47:50 +0000 |
commit | e7accbd4f2e2b385c61ccce430676927ccb782e7 (patch) | |
tree | a1503cf66e6384c574319bf8d2e2ed56443ba02f | |
parent | 31a89a3dc54c9cc6dc48959373a7b057b469dda8 (diff) | |
download | qpid-python-e7accbd4f2e2b385c61ccce430676927ccb782e7.tar.gz |
Comments and changes from review
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@661395 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 16 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 81c8c04d6d..9d769d7582 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -196,6 +196,8 @@ public class IncomingMessage implements Filterable<RuntimeException> } else { + // TODO + int offset; final int queueCount = destinationQueues.size(); if(queueCount == 1) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 4d40b18380..a6275900d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -461,8 +461,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { if(subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended() - && sub.isActive()) + && !sub.isSuspended()) { if( !sub.wouldSuspend(entry)) { @@ -474,13 +473,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { - // Update the last seen marker for this subscription, if some other process hasn't already - // updated it - QueueEntry queueEntryNode = sub.getLastSeenEntry(); - if(_entries.next(queueEntryNode) == entry) - { - sub.setLastSeenEntry(queueEntryNode,entry); - } deliverMessage(sub, entry); @@ -552,7 +544,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing // no-one else has updated it to something furhter on in the list - updateLastSeenEntry(sub, entry); + //TODO - check + //updateLastSeenEntry(sub, entry); return false; } @@ -1385,7 +1378,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner)) { - // we want to have one extra loop after the every subscription has reached the point where it cannot move + // we want to have one extra loop after every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to // move forward in the next iteration diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 6e19f53ffe..9572c82472 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -215,23 +215,26 @@ public class MessageListenerMultiConsumerTest extends TestCase { _logger.info("Performing Receive only with two consumers on one session "); - MessageConsumer consumer2 = _clientSession1.createConsumer(_queue); - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) + int msg; + for (msg = 0; msg < (MSG_COUNT / 2); msg++) { - final Message message = _consumer1.receive(100000); + final Message message = _consumer1.receive(1000); if(message == null) { - System.out.println("!!!!!!!! " + msg); + break; } - assertTrue(message != null); + } - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) + _consumer1.close(); + _clientSession1.close(); + + for (; msg < MSG_COUNT ; msg++) { - assertTrue(consumer2.receive(10000) != null); + assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null); } } else |