diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-27 12:43:04 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-27 12:43:04 +0000 |
commit | 980643b9364f2ec7e75f9e4a391754f5db4bc24a (patch) | |
tree | a7f43779191f41bc9d8413460c302ff25a67ab76 | |
parent | 4b1cc6b00ded3584ed2f11431845de09f195ed14 (diff) | |
download | qpid-python-980643b9364f2ec7e75f9e4a391754f5db4bc24a.tar.gz |
Refactoring updates (job queue changes, enqueue collections..)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@660490 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 478 insertions, 304 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 441f88b9b6..d1bea3410b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Collection; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -248,8 +249,10 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); } boolean routed = false; + Collection<AMQQueue> queues = new ArrayList<AMQQueue>(); for (Registration e : _bindings) { + if (e.binding.matches(headers)) { if (_logger.isDebugEnabled()) @@ -257,10 +260,12 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": delivering message with headers " + headers + " to " + e.queue.getName()); } - payload.enqueue(e.queue); + queues.add(e.queue); + routed = true; } } + payload.enqueue(queues); } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 8d3110ef18..d07501a188 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -262,8 +262,24 @@ public class TopicExchange extends AbstractExchange _filteredQueues.put(queue,newFilters); } - public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues) + public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues) { + if(queues == null) + { + if(_filteredQueues.isEmpty()) + { + return new ArrayList<AMQQueue>(_unfilteredQueues.keySet()); + } + else + { + queues = new HashSet<AMQQueue>(); + } + } + else if(!(queues instanceof Set)) + { + queues = new HashSet<AMQQueue>(queues); + } + queues.addAll(_unfilteredQueues.keySet()); if(!_filteredQueues.isEmpty()) { @@ -621,11 +637,11 @@ public class TopicExchange extends AbstractExchange } else { - Set<AMQQueue> queues = new HashSet<AMQQueue>(); + Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>(); for(TopicMatcherResult result : results) { - ((TopicExchangeResult)result).processMessage(message, queues); + queues = ((TopicExchangeResult)result).processMessage(message, queues); } return queues; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 0a6bfb15e6..bdb16d0fcb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -444,7 +444,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public boolean channelAwaitingClosure(int channelId) { - return _closingChannelsList.contains(channelId); + return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId); } public void addChannel(AMQChannel channel) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 05533e0d2d..0e5e7aa68c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -136,6 +136,11 @@ public class AMQMessage implements Filterable<AMQException> } } + public void clearStoreContext() + { + _storeContext = new StoreContext(); + } + public StoreContext getStoreContext() { return _storeContext; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 780cd49834..570bd97a28 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -58,15 +58,13 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void unregisterSubscription(final Subscription subscription) throws AMQException; + int getConsumerCount(); int getActiveConsumerCount(); boolean isUnused(); - - - boolean isEmpty(); int getMessageCount(); @@ -80,10 +78,27 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> long getOldestMessageArrivalTime(); - boolean isDeleted(); + int delete() throws AMQException; + + + QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; + + void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; + + void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; + + + + boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; + + + + void addQueueDeleteTask(final Task task); + + List<QueueEntry> getMessagesOnTheQueue(); List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); @@ -91,7 +106,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> QueueEntry getMessageOnTheQueue(long messageId); - void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext); @@ -99,9 +113,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext); - void quiesce(); - void start(); long getMaximumMessageSize(); @@ -132,27 +144,14 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> - int delete() throws AMQException; - - - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; - - void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; - - void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; - - void deliverAsync(); - - void addQueueDeleteTask(final Task task); - - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - void removeExpiredIfNoSubscribers() throws AMQException; Set<NotificationCheck> getNotificationChecks(); void flushSubscription(final Subscription sub) throws AMQException; + void deliverAsync(final Subscription sub); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 68b429efc6..81c8c04d6d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -32,10 +32,8 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.AMQException; -import org.apache.qpid.common.ClientProperties; import org.apache.log4j.Logger; -import java.util.ArrayList; import java.util.Collection; public class IncomingMessage implements Filterable<RuntimeException> @@ -198,19 +196,53 @@ public class IncomingMessage implements Filterable<RuntimeException> } else { + int offset; + final int queueCount = destinationQueues.size(); + if(queueCount == 1) + { + offset = 0; + } + else + { + offset = ((int)(message.getMessageId().longValue())) % queueCount; + if(offset < 0) + { + offset = -offset; + } + } + + int i = 0; for (AMQQueue q : destinationQueues) { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); + if(++i > offset) + { + // Increment the references to this message for each queue delivery. + message.incrementReference(); + // normal deliver so add this message at the end. + _txnContext.deliver(q, message); + } } + i = 0; + if(offset != 0) + { + for (AMQQueue q : destinationQueues) + { + if(i++ < offset) + { + // Increment the references to this message for each queue delivery. + message.incrementReference(); + // normal deliver so add this message at the end. + _txnContext.deliver(q, message); + } + } + } + } // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - + message.clearStoreContext(); return message; } finally @@ -257,16 +289,6 @@ public class IncomingMessage implements Filterable<RuntimeException> return _messagePublishInfo.isImmediate(); } - - public void enqueue(final AMQQueue q) throws AMQException - { - if(_destinationQueues == null) - { - _destinationQueues = new ArrayList<AMQQueue>(); - } - _destinationQueues.add(q); - } - public ContentHeaderBody getContentHeaderBody() { return _contentHeaderBody; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index fea1db97b3..d26d6af7b2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -42,7 +42,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private final AMQMessage _message; + private AMQMessage _message; private Set<Subscription> _rejectedBy = null; @@ -376,7 +376,7 @@ public class QueueEntryImpl implements QueueEntry if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { - _queueEntryList.advanceHead(); + _queueEntryList.advanceHead(); return true; } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 16d24e74ee..5baf48245c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -11,6 +11,8 @@ import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.AMQException; +import org.apache.qpid.pool.ReadWriteRunnable; +import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.configuration.Configured; import org.apache.log4j.Logger; @@ -21,8 +23,6 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -84,7 +84,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected final SubscriptionList _subscriptionList = new SubscriptionList(this); private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); - private boolean _exclusiveSubscriber; + private volatile Subscription _exclusiveSubscriber; private final QueueEntryList _entries; @@ -116,9 +116,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") public long _minimumAlertRepeatGap; + + + private static final int MAX_ASYNC_DELIVERIES = 10; + + private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); @@ -155,7 +161,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); - _asyncDelivery = AsyncDeliveryConfig.getAsyncDeliveryExecutor(); + _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + + AsyncDeliveryConfig.getAsyncDeliveryExecutor(); try { @@ -235,11 +243,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // ------ Manage Subscriptions - public void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException + public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException { - if(_exclusiveSubscriber) + if(isExclusiveSubscriber()) { throw new ExistingExclusiveSubscription(); } @@ -249,7 +257,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new ExistingSubscriptionPreventsExclusive(); } - _exclusiveSubscriber = exclusive; + setExclusiveSubscriber(subscription); _activeSubscriberCount.incrementAndGet(); subscription.setStateListener(this); @@ -274,7 +282,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void unregisterSubscription(final Subscription subscription) throws AMQException + public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException { if(subscription == null) { @@ -289,9 +297,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { subscription.close(); // No longer can the queue have an exclusive consumer - _exclusiveSubscriber = false; + setExclusiveSubscriber(null); + QueueEntry lastSeen; + + while((lastSeen = subscription.getLastSeenEntry()) != null) + { + subscription.setLastSeenEntry(lastSeen, null); + } + @@ -329,83 +344,84 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _totalMessagesReceived.incrementAndGet(); - QueueEntry entry = _entries.add(message); + QueueEntry entry; + Subscription exclusiveSub = _exclusiveSubscriber; + if(exclusiveSub != null) + { + exclusiveSub.getSendLock(); - /* + try + { + entry = _entries.add(message); + deliverToSubscription(exclusiveSub, entry); - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - */ - SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); - SubscriptionList.SubscriptionNode nextNode = node.getNext(); - if(nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } - while(nextNode != null) - { - if(_lastSubscriptionNode.compareAndSet(node, nextNode)) - { - break; - } - else - { - node = _lastSubscriptionNode.get(); - nextNode = node.getNext(); - if(nextNode == null) + // where there is more than one producer there's a reasonable chance that even though there is + // no "queueing" we do not deliver because we get an interleving of _entries.add and + // deliverToSubscription between threads. Therefore have one more try. + if(!(entry.isAcquired() || entry.isDeleted())) { - nextNode = _subscriptionList.getHead().getNext(); + deliverToSubscription(exclusiveSub, entry); } } + finally + { + exclusiveSub.releaseSendLock(); + } } + else + { + entry = _entries.add(message); + /* + iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; - - while(!entry.isAcquired() && loops != 0) - { + */ + SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); + SubscriptionList.SubscriptionNode nextNode = node.getNext(); if(nextNode == null) { - loops--; - nextNode = _subscriptionList.getHead(); + nextNode = _subscriptionList.getHead().getNext(); } - else + while(nextNode != null) { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - synchronized(sub.getSendLock()) + if(_lastSubscriptionNode.compareAndSet(node, nextNode)) { - if(subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended() - && sub.isActive()) + break; + } + else + { + node = _lastSubscriptionNode.get(); + nextNode = node.getNext(); + if(nextNode == null) { - if( !sub.wouldSuspend(entry)) - { - if(!sub.isBrowser() && !entry.acquire(sub)) - { - sub.restoreCredit(entry); - } - else - { - QueueEntry queueEntryNode = sub.getLastSeenEntry(); - if(_entries.next(queueEntryNode) == entry) - { - sub.setLastSeenEntry(queueEntryNode,entry); - } - - deliverMessage(sub, entry); - - } - } + nextNode = _subscriptionList.getHead().getNext(); } } } - nextNode = nextNode.getNext(); - } + // always do one extra loop after we believe we've finished + // this catches the case where we *just* miss an update + int loops = 2; + + while(!entry.isAcquired() && loops != 0) + { + if(nextNode == null) + { + loops--; + nextNode = _subscriptionList.getHead(); + } + else + { + // if subscription at end, and active, offer + Subscription sub = nextNode.getSubscription(); + deliverToSubscription(sub, entry); + } + nextNode = nextNode.getNext(); + + } + } if(entry.immediateAndNotDelivered()) @@ -413,7 +429,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener dequeue(storeContext, entry); entry.dispose(storeContext); } - else if(!entry.isAcquired()) + else if(!(entry.isAcquired() || entry.isDeleted())) { checkSubscriptionsNotAheadOfDelivery(entry); @@ -435,6 +451,42 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + private void deliverToSubscription(final Subscription sub, final QueueEntry entry) + throws AMQException + { + sub.getSendLock(); + try + { + if(subscriptionReadyAndHasInterest(sub, entry) + && !sub.isSuspended() + && sub.isActive()) + { + if( !sub.wouldSuspend(entry)) + { + if(!sub.isBrowser() && !entry.acquire(sub)) + { + sub.restoreCredit(entry); + } + else + { + QueueEntry queueEntryNode = sub.getLastSeenEntry(); + if(_entries.next(queueEntryNode) == entry) + { + sub.setLastSeenEntry(queueEntryNode,entry); + } + + deliverMessage(sub, entry); + + } + } + } + } + finally + { + sub.releaseSendLock(); + } + } + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) { // This method is only required for queues which mess with ordering @@ -588,7 +640,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message entry to resend and move back the subscription pointer. */ - synchronized(subscription.getSendLock()) + subscription.getSendLock(); + try { if(!subscription.isClosed()) { @@ -600,6 +653,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return false; } } + finally + { + subscription.releaseSendLock(); + } } @@ -703,7 +760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _activeSubscriberCount.incrementAndGet(); } - deliverAsync(); + deliverAsync(sub); } } @@ -722,6 +779,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _atomicQueueSize; } + private boolean isExclusiveSubscriber() + { + return _exclusiveSubscriber != null; + } + + private void setExclusiveSubscriber(Subscription exclusiveSubscriber) + { + _exclusiveSubscriber = exclusiveSubscriber; + } + public static interface QueueEntryFilter { public boolean accept(QueueEntry entry); @@ -999,22 +1066,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void quiesce() - { - _quiesced.set(true); - } - - public void start() - { - if(_quiesced.compareAndSet(true,false)) - { - deliverAsync(); - } - } - - - - // ------ Management functions @@ -1088,6 +1139,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } _deleteTaskList.clear(); + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); } return getMessageCount(); @@ -1098,13 +1150,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _stateChangeCount.incrementAndGet(); - if(_asynchronousRunner.get() == null) - { - _asyncDelivery.execute(new Runner()); + Runner runner = new Runner(); + + if(_asynchronousRunner.compareAndSet(null,runner)) + { + _asyncDelivery.execute(runner); } } - private class Runner implements Runnable + public void deliverAsync(Subscription sub) + { + _asyncDelivery.execute(new SubFlushRunner(sub)); + } + + private class Runner implements ReadWriteRunnable { public void run() { @@ -1118,21 +1177,77 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } + } + + + private class SubFlushRunner implements ReadWriteRunnable + { + private final Subscription _sub; + + + public SubFlushRunner(Subscription sub) + { + _sub = sub; + } + + public void run() + { + boolean complete = false; + try + { + complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES); + + } + catch (AMQException e) + { + _logger.error(e); + } + if(!complete && !_sub.isSuspended()) + { + _asyncDelivery.execute(this); + } + + } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } } public void flushSubscription(Subscription sub) throws AMQException { + flushSubscription(sub, Long.MAX_VALUE); + } + + public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException + { boolean atTail = false; - while(sub.isActive() && !atTail) + + while(!sub.isSuspended() && !atTail && deliveries != 0) { - synchronized(sub.getSendLock()) + sub.getSendLock(); + try { if(sub.isActive()) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if(!(node.isAcquired() || node.isDeleted())) { if(!sub.isSuspended()) @@ -1148,6 +1263,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { + deliveries--; deliverMessage(sub, node); if(sub.isBrowser()) @@ -1159,8 +1275,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.setLastSeenEntry(node, newNode); node = sub.getLastSeenEntry(); } - - } } @@ -1180,13 +1294,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } } + } atTail = (_entries.next(node) == null); } } + finally + { + sub.releaseSendLock(); + } } + + if(!isExclusiveSubscriber()) + { + advanceAllSubscriptions(); + } + + return atTail; + } + + protected void advanceAllSubscriptions() throws AMQException + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + while(subscriberIter.advance()) + { + SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); + Subscription sub = subNode.getSubscription(); + moveSubscriptionToNextNode(sub); + } } private QueueEntry moveSubscriptionToNextNode(final Subscription sub) @@ -1227,8 +1364,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean deliveryIncomplete = true; int extraLoops = 1; + int deliveries = MAX_ASYNC_DELIVERIES; + + _asynchronousRunner.compareAndSet(runner,null); - while(((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner)) + while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner)) { // we want to have one extra loop after the every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to @@ -1251,17 +1391,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener Subscription sub = subscriptionIter.getNode().getSubscription(); if(sub != null) { - synchronized(sub.getSendLock()) + sub.getSendLock(); + try { + QueueEntry node = moveSubscriptionToNextNode(sub); + if(sub.isActive()) { boolean advanced = false; + boolean subActive = false; - QueueEntry node = moveSubscriptionToNextNode(sub); if(!(node.isAcquired() || node.isDeleted())) { if(!sub.isSuspended()) { + subActive = true; if(sub.hasInterest(node)) { if(!sub.wouldSuspend(node)) @@ -1274,6 +1418,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener else { deliverMessage(sub, node); + deliveries--; if(sub.isBrowser()) { @@ -1309,7 +1454,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } final boolean atTail = (_entries.next(node) == null); - done = done && atTail; + done = done && (!subActive || atTail); if(atTail && !advanced && sub.isAutoClose()) { @@ -1322,6 +1467,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + finally + { + sub.releaseSendLock(); + } } if(done) { @@ -1346,6 +1495,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner)) + { + _asyncDelivery.execute(runner); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index acbeae0f40..537966e3aa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -28,7 +28,6 @@ import org.apache.qpid.server.queue.QueueEntry; public interface Subscription { - boolean isActive(); public static enum State @@ -75,7 +74,7 @@ public interface Subscription boolean wouldSuspend(QueueEntry msg); Object getSendLock(); - + void releaseSendLock(); void resend(final QueueEntry entry) throws AMQException; @@ -87,4 +86,9 @@ public interface Subscription boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue); + + boolean isActive(); + + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 653c7de514..8e124c8b0c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -22,6 +22,9 @@ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.Lock; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -62,6 +65,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final RecordDeliveryMethod _recordMethod; private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final Lock _stateChangeLock; + private final Lock _stateChangeExclusiveLock; + static final class BrowserSubscription extends SubscriptionImpl @@ -254,7 +260,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); private AMQQueue _queue; - private final AtomicBoolean _sendLock = new AtomicBoolean(false); + private final AtomicBoolean _deleted = new AtomicBoolean(false); + @@ -280,7 +287,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _deliveryMethod = deliveryMethod; _recordMethod = recordMethod; - + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + _stateChangeLock = readWriteLock.readLock(); + _stateChangeExclusiveLock = readWriteLock.writeLock(); if (arguments != null) { @@ -334,7 +343,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean isSuspended() { - return !isActive() || _channel.isSuspended() || _sendLock.get(); + return !isActive() || _channel.isSuspended() || _deleted.get(); } /** @@ -344,7 +353,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage */ public void queueDeleted(AMQQueue queue) { - _sendLock.set(true); + _deleted.set(true); // _channel.queueDeleted(queue); } @@ -435,7 +444,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { boolean closed = false; State state = getState(); - synchronized (_sendLock) + + _stateChangeExclusiveLock.lock(); + try { while(!closed && state != State.CLOSED) { @@ -451,6 +462,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } _creditManager.removeListener(this); } + finally + { + _stateChangeExclusiveLock.unlock(); + } + if (closed) { @@ -481,7 +497,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public Object getSendLock() { - return _sendLock; + _stateChangeLock.lock(); + return _deleted; + } + + public void releaseSendLock() + { + _stateChangeLock.unlock(); } public void resend(final QueueEntry entry) throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 9924d1c770..ca614e053a 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -44,6 +44,7 @@ import org.apache.mina.common.ByteBuffer; import javax.management.Notification; import java.util.LinkedList; +import java.util.Collections; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase @@ -303,7 +304,7 @@ public class AMQQueueAlertTest extends TestCase for (int i = 0; i < messages.length; i++) { messages[i] = message(false, size); - messages[i].enqueue(_queue); + messages[i].enqueue(Collections.singleton(_queue)); messages[i].routingComplete(_messageStore, new MessageHandleFactory()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 98f78e3d69..bf0a8a6d90 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -48,6 +48,7 @@ import org.apache.mina.common.ByteBuffer; import javax.management.JMException; import java.util.LinkedList; +import java.util.Collections; /** * Test class to test AMQQueueMBean attribtues and operations @@ -216,7 +217,7 @@ public class AMQQueueMBeanTest extends TestCase long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - msg.enqueue(_queue); + msg.enqueue(Collections.singleton(_queue)); msg.routingComplete(_messageStore, new MessageHandleFactory()); msg.addContentBodyFrame(new ContentChunk() @@ -318,7 +319,7 @@ public class AMQQueueMBeanTest extends TestCase for (int i = 0; i < messageCount; i++) { IncomingMessage currentMessage = message(false, persistent); - currentMessage.enqueue(_queue); + currentMessage.enqueue(Collections.singleton(_queue)); // route header currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index f1a7d4970a..c60b9ee0cb 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -464,15 +464,49 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return false; } - if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode)) + final int hashCode = _hashCode; + + final int otherHashCode = otherString._hashCode; + + if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode)) { return false; } + final int length = _length; + + if(length != otherString._length) + { + return false; + } - return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data)) - || Arrays.equals(getBytes(),otherString.getBytes()); + final byte[] data = _data; + + final byte[] otherData = otherString._data; + + final int offset = _offset; + + final int otherOffset = otherString._offset; + + if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length) + { + return Arrays.equals(data, otherData); + } + else + { + int thisIdx = offset; + int otherIdx = otherOffset; + for(int i = length; i-- != 0; ) + { + if(!(data[thisIdx++] == otherData[otherIdx++])) + { + return false; + } + } + } + + return true; } diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java index 05141aea7b..8de0f93ce9 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java @@ -41,16 +41,16 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki private final ReentrantLock _putLock = new ReentrantLock(); - private final ConcurrentLinkedQueue<Job> _readJobQueue = new ConcurrentLinkedQueue<Job>(); + private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); - private final ConcurrentLinkedQueue<Job> _writeJobQueue = new ConcurrentLinkedQueue<Job>(); + private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>(); private class ReadWriteJobIterator implements Iterator<Runnable> { private boolean _onReads; - private Iterator<Job> _iter = _writeJobQueue.iterator(); + private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator(); public boolean hasNext() { @@ -112,12 +112,12 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki public boolean offer(final Runnable runnable) { - final Job job = (Job) runnable; + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; final ReentrantLock putLock = _putLock; putLock.lock(); try { - if(job.isReadJob()) + if(job.isRead()) { _readJobQueue.offer(job); } @@ -147,13 +147,13 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki public void put(final Runnable runnable) throws InterruptedException { - final Job job = (Job) runnable; + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; final ReentrantLock putLock = _putLock; putLock.lock(); try { - if(job.isReadJob()) + if(job.isRead()) { _readJobQueue.offer(job); } @@ -185,13 +185,13 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException { - final Job job = (Job) runnable; + final ReadWriteRunnable job = (ReadWriteRunnable) runnable; final ReentrantLock putLock = _putLock; putLock.lock(); try { - if(job.isReadJob()) + if(job.isRead()) { _readJobQueue.offer(job); } @@ -240,7 +240,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki throw ie; } - Job job = _writeJobQueue.poll(); + ReadWriteRunnable job = _writeJobQueue.poll(); if(job == null) { job = _readJobQueue.poll(); @@ -266,7 +266,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki final AtomicInteger count = _count; long nanos = unit.toNanos(timeout); takeLock.lockInterruptibly(); - Job job = null; + ReadWriteRunnable job = null; try { @@ -322,7 +322,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki _takeLock.lock(); try { - Job job; + ReadWriteRunnable job; while((job = _writeJobQueue.peek())!= null) { c.add(job); @@ -356,7 +356,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki _takeLock.lock(); try { - Job job; + ReadWriteRunnable job; while(total<=maxElements && (job = _writeJobQueue.peek())!= null) { c.add(job); @@ -391,7 +391,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki { if(_count.get() > 0) { - Job job = _writeJobQueue.poll(); + ReadWriteRunnable job = _writeJobQueue.poll(); if(job == null) { job = _readJobQueue.poll(); @@ -417,7 +417,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki takeLock.lock(); try { - Job job = _writeJobQueue.peek(); + ReadWriteRunnable job = _writeJobQueue.peek(); if(job == null) { job = _readJobQueue.peek(); diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java new file mode 100644 index 0000000000..ad04a923e1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java @@ -0,0 +1,27 @@ +package org.apache.qpid.pool; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public interface ReadWriteRunnable extends Runnable +{ + boolean isRead(); + boolean isWrite(); +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index d99973cffb..ce9c6ae4cb 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -110,7 +110,7 @@ public class ReferenceCountingExecutorService * * @return An executor service. */ - ExecutorService acquireExecutorService() + public ExecutorService acquireExecutorService() { synchronized (_lock) { @@ -140,7 +140,7 @@ public class ReferenceCountingExecutorService * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls * to zero, the executor service is shut down. */ - void releaseExecutorService() + public void releaseExecutorService() { synchronized (_lock) { diff --git a/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java index 779d47be1c..0b7e300cec 100644 --- a/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java +++ b/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java @@ -22,6 +22,8 @@ package org.apache.qpid.extras.exchanges.diagnostic; import java.util.List; import java.util.Map; +import java.util.ArrayList; +import java.util.Collection; import javax.management.JMException; import javax.management.openmbean.OpenDataException; @@ -201,8 +203,10 @@ public class DiagnosticExchange extends AbstractExchange headers.put(key, value); ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers); AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue")); - - payload.enqueue(q); + + Collection<AMQQueue> queues = new ArrayList<AMQQueue>(); + queues.add(q); + payload.enqueue(queues); } diff --git a/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java b/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java deleted file mode 100644 index 5323ad28bf..0000000000 --- a/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.vmpipe.support; - -import org.apache.mina.common.IdleStatus; - -import java.util.HashMap; -import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.Map; - -/** - * This file is a patch to override MINA, because of the IdentityHashMap bug. Workaround to be supplied in MINA 1.0.7. - * This patched file will be removed once upgraded onto a newer MINA. - * - * Dectects idle sessions and fires <tt>sessionIdle</tt> events to them. - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - */ -public class VmPipeIdleStatusChecker -{ - private static final VmPipeIdleStatusChecker INSTANCE = new VmPipeIdleStatusChecker(); - - public static VmPipeIdleStatusChecker getInstance() - { - return INSTANCE; - } - - private final Map sessions = new HashMap(); // will use as a set - - private final Worker worker = new Worker(); - - private VmPipeIdleStatusChecker() - { - worker.start(); - } - - public void addSession(VmPipeSessionImpl session) - { - synchronized (sessions) - { - sessions.put(session, session); - } - } - - private class Worker extends Thread - { - private Worker() - { - super("VmPipeIdleStatusChecker"); - setDaemon(true); - } - - public void run() - { - for (;;) - { - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) - { } - - long currentTime = System.currentTimeMillis(); - - synchronized (sessions) - { - Iterator it = sessions.keySet().iterator(); - while (it.hasNext()) - { - VmPipeSessionImpl session = (VmPipeSessionImpl) it.next(); - if (!session.isConnected()) - { - it.remove(); - } - else - { - notifyIdleSession(session, currentTime); - } - } - } - } - } - } - - private void notifyIdleSession(VmPipeSessionImpl session, long currentTime) - { - notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, - Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); - notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, - Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); - notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, - Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - } - - private void notifyIdleSession0(VmPipeSessionImpl session, long currentTime, long idleTime, IdleStatus status, - long lastIoTime) - { - if ((idleTime > 0) && (lastIoTime != 0) && ((currentTime - lastIoTime) >= idleTime)) - { - session.increaseIdleCount(status); - session.getFilterChain().fireSessionIdle(session, status); - } - } - -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index dae81a875d..bbd6deffd3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.util.NullApplicationRegistry; import java.util.LinkedList; import java.util.Set; +import java.util.Collections; /** * Tests that acknowledgements are handled correctly. @@ -145,7 +146,7 @@ public class AckTest extends TestCase // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription - msg.enqueue(_queue); + msg.enqueue(Collections.singleton(_queue)); msg.routingComplete(_messageStore, factory); if(msg.allContentReceived()) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 7fd46474ab..4dbb550c7c 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -91,6 +91,11 @@ public class SubscriptionTestHelper implements Subscription return new Object(); } + public void releaseSendLock() + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void resend(final QueueEntry entry) { //To change body of implemented methods use File | Settings | File Templates. |